This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a7674fbc2760 feat(schema): Add converter for Spark StructType to 
HoodieSchema (#17475)
a7674fbc2760 is described below

commit a7674fbc276088b8380175a389f868a38addce77
Author: Rahil C <[email protected]>
AuthorDate: Wed Dec 10 20:48:00 2025 -0800

    feat(schema): Add converter for Spark StructType to HoodieSchema (#17475)
    
    * feat(schema): Add converter for Spark StructType to HoodieSchema
    
    * intial attempt at following spark adapter pattern for hoodie schema
    
    * fix spark 3.4 issue
    
    * fix test
    
    * get spark 4 to compile and work
    
    * Reduce code duplication between all spark versions, follow pattern of 
existing avro converter by using private[sql] and spark package 
org.apache.spark.sql.avro
    
    * get spark 4 compiling after refactor
    
    * address tim comments
    
    * address tim test comments
    
    * address tim comment
    
    * address ethan minor comment without refactor
    
    * refactor with ethan recommendations
    
    * fix spark4
    
    * add scala doc comment
---
 .../apache/hudi/HoodieSchemaConversionUtils.scala  | 197 ++++++++
 .../sql/avro/HoodieSparkSchemaConverters.scala     | 246 +++++++++
 .../hudi/TestHoodieSchemaConversionUtils.scala     | 550 +++++++++++++++++++++
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |   2 +-
 .../spark/sql/adapter/BaseSpark4Adapter.scala      |   2 +-
 5 files changed, 995 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
new file mode 100644
index 000000000000..f25c098c683f
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.internal.schema.HoodieSchemaException
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Utilities for converting between HoodieSchema and Spark SQL schemas.
+ *
+ * This object provides high-level conversion methods with utilities for
+ * handling defaults and nullability alignment.
+ */
+object HoodieSchemaConversionUtils {
+
+  /**
+   * Converts HoodieSchema to Catalyst's StructType.
+   *
+   * @param hoodieSchema HoodieSchema to convert
+   * @return Spark StructType corresponding to the HoodieSchema
+   * @throws HoodieSchemaException if conversion fails
+   */
+  def convertHoodieSchemaToStructType(hoodieSchema: HoodieSchema): StructType 
= {
+    try {
+      HoodieSparkSchemaConverters.toSqlType(hoodieSchema) match {
+        case (dataType, _) => dataType.asInstanceOf[StructType]
+      }
+    } catch {
+      case e: Exception => throw new HoodieSchemaException(
+        s"Failed to convert HoodieSchema to StructType: $hoodieSchema", e)
+    }
+  }
+
+  /**
+   * Converts HoodieSchema to Catalyst's DataType (general purpose, not just 
StructType).
+   *
+   * @param hoodieSchema HoodieSchema to convert
+   * @return Spark DataType corresponding to the HoodieSchema
+   * @throws HoodieSchemaException if conversion fails
+   */
+  def convertHoodieSchemaToDataType(hoodieSchema: HoodieSchema): DataType = {
+    try {
+      HoodieSparkSchemaConverters.toSqlType(hoodieSchema) match {
+        case (dataType, _) => dataType
+      }
+    } catch {
+      case e: Exception => throw new HoodieSchemaException(
+        s"Failed to convert HoodieSchema to DataType: $hoodieSchema", e)
+    }
+  }
+
+  /**
+   * Converts StructType to HoodieSchema.
+   *
+   * @param structType Catalyst's StructType or DataType
+   * @param qualifiedName HoodieSchema qualified name (namespace.name format)
+   * @return HoodieSchema corresponding to the Spark DataType
+   * @throws HoodieSchemaException if conversion fails
+   */
+  def convertStructTypeToHoodieSchema(structType: DataType, qualifiedName: 
String): HoodieSchema = {
+    val (namespace, name) = {
+      val parts = qualifiedName.split('.')
+      if (parts.length > 1) {
+        (parts.init.mkString("."), parts.last)
+      } else {
+        ("", parts.head)
+      }
+    }
+    convertStructTypeToHoodieSchema(structType, name, namespace)
+  }
+
+  /**
+   * Converts StructType to HoodieSchema.
+   *
+   * @param structType Catalyst's StructType or DataType
+   * @param structName Schema record name
+   * @param recordNamespace Schema record namespace
+   * @return HoodieSchema corresponding to the Spark DataType
+   * @throws HoodieSchemaException if conversion fails
+   */
+  def convertStructTypeToHoodieSchema(structType: DataType,
+                                      structName: String,
+                                      recordNamespace: String): HoodieSchema = 
{
+    try {
+      HoodieSparkSchemaConverters.toHoodieType(structType, nullable = false, 
structName, recordNamespace)
+    } catch {
+      case e: Exception => throw new HoodieSchemaException(
+        s"Failed to convert struct type to HoodieSchema: $structType", e)
+    }
+  }
+
+  /**
+   * Recursively aligns the nullable property of Spark schema fields with 
HoodieSchema.
+   *
+   * @param sourceSchema Source Spark StructType to align
+   * @param hoodieSchema HoodieSchema to use as source of truth
+   * @return StructType with aligned nullability
+   */
+  def alignFieldsNullability(sourceSchema: StructType, hoodieSchema: 
HoodieSchema): StructType = {
+    val hoodieFieldsMap = hoodieSchema.getFields.asScala.map(f => (f.name(), 
f)).toMap
+
+    val alignedFields = sourceSchema.fields.map { field =>
+      hoodieFieldsMap.get(field.name) match {
+        case Some(hoodieField) =>
+          val alignedField = field.copy(nullable = hoodieField.isNullable)
+
+          field.dataType match {
+            case structType: StructType =>
+              val nestedSchema = hoodieField.schema().getNonNullType
+              if (nestedSchema.getType == HoodieSchemaType.RECORD) {
+                alignedField.copy(dataType = 
alignFieldsNullability(structType, nestedSchema))
+              } else {
+                alignedField
+              }
+
+            case ArrayType(elementType, _) =>
+              val arraySchema = hoodieField.schema().getNonNullType
+              if (arraySchema.getType == HoodieSchemaType.ARRAY) {
+                val elemSchema = arraySchema.getElementType
+                val newElementType = updateElementType(elementType, elemSchema)
+                alignedField.copy(dataType = ArrayType(newElementType, 
elemSchema.isNullable))
+              } else {
+                alignedField
+              }
+
+            case MapType(keyType, valueType, _) =>
+              val mapSchema = hoodieField.schema().getNonNullType
+              if (mapSchema.getType == HoodieSchemaType.MAP) {
+                val valueSchema = mapSchema.getValueType
+                val newValueType = updateElementType(valueType, valueSchema)
+                alignedField.copy(dataType = MapType(keyType, newValueType, 
valueSchema.isNullable))
+              } else {
+                alignedField
+              }
+
+            case _ => alignedField
+          }
+
+        case None => field.copy()
+      }
+    }
+
+    StructType(alignedFields)
+  }
+
+  /**
+   * Recursively updates element types for complex types (arrays, maps, 
structs).
+   */
+  private def updateElementType(dataType: DataType, hoodieSchema: 
HoodieSchema): DataType = {
+    dataType match {
+      case structType: StructType =>
+        if (hoodieSchema.getType == HoodieSchemaType.RECORD) {
+          alignFieldsNullability(structType, hoodieSchema)
+        } else {
+          structType
+        }
+
+      case ArrayType(elemType, _) =>
+        if (hoodieSchema.getType == HoodieSchemaType.ARRAY) {
+          val elemSchema = hoodieSchema.getElementType
+          ArrayType(updateElementType(elemType, elemSchema), 
elemSchema.isNullable)
+        } else {
+          dataType
+        }
+
+      case MapType(keyType, valueType, _) =>
+        if (hoodieSchema.getType == HoodieSchemaType.MAP) {
+          val valueSchema = hoodieSchema.getValueType
+          MapType(keyType, updateElementType(valueType, valueSchema), 
valueSchema.isNullable)
+        } else {
+          dataType
+        }
+
+      case _ => dataType
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
new file mode 100644
index 000000000000..a7c41cfa2517
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField, 
HoodieSchemaType}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.types.Decimal.minBytesForPrecision
+import org.apache.spark.sql.types._
+
+import scala.collection.JavaConverters._
+
+/**
+ * This object contains methods that are used to convert HoodieSchema to Spark 
SQL schemas and vice versa.
+ *
+ * This provides direct conversion between HoodieSchema and Spark DataType
+ * without going through Avro Schema intermediary.
+ *
+ * NOTE: the package of this class is intentionally kept as 
"org.apache.spark.sql.avro" which is similar to the existing
+ * Spark Avro connector's SchemaConverters.scala
+ * 
(https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala).
+ * The reason for this is so that Spark 3.3 is able to access private spark 
sql type classes like TimestampNTZType.
+ */
+
+@DeveloperApi
+object HoodieSparkSchemaConverters {
+
+  /**
+   * Internal wrapper for SQL data type and nullability.
+   */
+  case class SchemaType(dataType: DataType, nullable: Boolean)
+
+  def toSqlType(hoodieSchema: HoodieSchema): (DataType, Boolean) = {
+    val result = toSqlTypeHelper(hoodieSchema, Set.empty)
+    (result.dataType, result.nullable)
+  }
+
+  def toHoodieType(catalystType: DataType, nullable: Boolean, recordName: 
String, nameSpace: String): HoodieSchema = {
+    val schema = catalystType match {
+      // Primitive types
+      case BooleanType => HoodieSchema.create(HoodieSchemaType.BOOLEAN)
+      case ByteType | ShortType | IntegerType => 
HoodieSchema.create(HoodieSchemaType.INT)
+      case LongType => HoodieSchema.create(HoodieSchemaType.LONG)
+      case DateType => HoodieSchema.createDate()
+      case TimestampType => HoodieSchema.createTimestampMicros()
+      case TimestampNTZType => HoodieSchema.createLocalTimestampMicros()
+      case FloatType => HoodieSchema.create(HoodieSchemaType.FLOAT)
+      case DoubleType => HoodieSchema.create(HoodieSchemaType.DOUBLE)
+      case StringType | _: CharType | _: VarcharType => 
HoodieSchema.create(HoodieSchemaType.STRING)
+      case NullType => HoodieSchema.create(HoodieSchemaType.NULL)
+      case BinaryType => HoodieSchema.create(HoodieSchemaType.BYTES)
+
+      case d: DecimalType =>
+        val fixedSize = minBytesForPrecision(d.precision)
+        val name = nameSpace match {
+          case "" => s"$recordName.fixed"
+          case _ => s"$nameSpace.$recordName.fixed"
+        }
+        HoodieSchema.createDecimal(name, nameSpace, null, d.precision, 
d.scale, fixedSize)
+
+      // Complex types
+      case ArrayType(elementType, containsNull) =>
+        val elementSchema = toHoodieType(elementType, containsNull, 
recordName, nameSpace)
+        HoodieSchema.createArray(elementSchema)
+
+      case MapType(StringType, valueType, valueContainsNull) =>
+        val valueSchema = toHoodieType(valueType, valueContainsNull, 
recordName, nameSpace)
+        HoodieSchema.createMap(valueSchema)
+
+      case st: StructType =>
+        val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" 
else recordName
+
+        // Check if this might be a union (using heuristic like Avro converter)
+        if (canBeUnion(st)) {
+          val nonNullUnionFieldTypes = st.map { f =>
+            toHoodieType(f.dataType, nullable = false, f.name, childNameSpace)
+          }
+          val unionFieldTypes = if (nullable) {
+            (HoodieSchema.create(HoodieSchemaType.NULL) +: 
nonNullUnionFieldTypes).asJava
+          } else {
+            nonNullUnionFieldTypes.asJava
+          }
+          HoodieSchema.createUnion(unionFieldTypes)
+        } else {
+          // Create record
+          val fields = st.map { f =>
+            val fieldSchema = toHoodieType(f.dataType, f.nullable, f.name, 
childNameSpace)
+            val doc = f.getComment.orNull
+            HoodieSchemaField.of(f.name, fieldSchema, doc)
+          }
+
+          HoodieSchema.createRecord(recordName, nameSpace, null, fields.asJava)
+        }
+
+      case other => throw new IncompatibleSchemaException(s"Unexpected Spark 
DataType: $other")
+    }
+
+    // Wrap with null union if nullable (and not already a union)
+    if (nullable && catalystType != NullType && schema.getType != 
HoodieSchemaType.UNION) {
+      HoodieSchema.createNullable(schema)
+    } else {
+      schema
+    }
+  }
+
+  private def toSqlTypeHelper(hoodieSchema: HoodieSchema, existingRecordNames: 
Set[String]): SchemaType = {
+    hoodieSchema.getType match {
+      // Primitive types
+      case HoodieSchemaType.INT => SchemaType(IntegerType, nullable = false)
+      case HoodieSchemaType.STRING | HoodieSchemaType.ENUM => 
SchemaType(StringType, nullable = false)
+      case HoodieSchemaType.BOOLEAN => SchemaType(BooleanType, nullable = 
false)
+      case HoodieSchemaType.BYTES | HoodieSchemaType.FIXED => 
SchemaType(BinaryType, nullable = false)
+      case HoodieSchemaType.DOUBLE => SchemaType(DoubleType, nullable = false)
+      case HoodieSchemaType.FLOAT => SchemaType(FloatType, nullable = false)
+      case HoodieSchemaType.LONG => SchemaType(LongType, nullable = false)
+      case HoodieSchemaType.NULL => SchemaType(NullType, nullable = true)
+
+      // Logical types
+      case HoodieSchemaType.DATE =>
+        SchemaType(DateType, nullable = false)
+
+      case HoodieSchemaType.TIMESTAMP =>
+        hoodieSchema match {
+          case ts: HoodieSchema.Timestamp if !ts.isUtcAdjusted =>
+            SchemaType(TimestampNTZType, nullable = false)
+          case _ =>
+            SchemaType(TimestampType, nullable = false)
+        }
+
+      case HoodieSchemaType.DECIMAL =>
+        hoodieSchema match {
+          case dec: HoodieSchema.Decimal =>
+            SchemaType(DecimalType(dec.getPrecision, dec.getScale), nullable = 
false)
+          case _ =>
+            throw new IncompatibleSchemaException(
+              s"DECIMAL type must be HoodieSchema.Decimal instance, got: 
${hoodieSchema.getClass}")
+        }
+
+      case HoodieSchemaType.TIME =>
+        hoodieSchema match {
+          case time: HoodieSchema.Time =>
+            time.getPrecision match {
+              case TimePrecision.MILLIS => SchemaType(IntegerType, nullable = 
false)
+              case TimePrecision.MICROS => SchemaType(LongType, nullable = 
false)
+            }
+          case _ =>
+            throw new IncompatibleSchemaException(
+              s"TIME type must be HoodieSchema.Time instance, got: 
${hoodieSchema.getClass}")
+        }
+
+      case HoodieSchemaType.UUID =>
+        SchemaType(StringType, nullable = false)
+
+      // Complex types
+      case HoodieSchemaType.RECORD =>
+        val fullName = hoodieSchema.getFullName
+        if (existingRecordNames.contains(fullName)) {
+          throw new IncompatibleSchemaException(
+            s"""
+               |Found recursive reference in HoodieSchema, which cannot be 
processed by Spark:
+               |$fullName
+             """.stripMargin)
+        }
+        val newRecordNames = existingRecordNames + fullName
+        val fields = hoodieSchema.getFields.asScala.map { f =>
+          val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
+          val metadata = if (f.doc().isPresent) {
+            new MetadataBuilder().putString("comment", f.doc().get()).build()
+          } else {
+            Metadata.empty
+          }
+          StructField(f.name(), schemaType.dataType, schemaType.nullable, 
metadata)
+        }
+        SchemaType(StructType(fields.toSeq), nullable = false)
+
+      case HoodieSchemaType.ARRAY =>
+        val elementSchema = hoodieSchema.getElementType
+        val schemaType = toSqlTypeHelper(elementSchema, existingRecordNames)
+        SchemaType(ArrayType(schemaType.dataType, containsNull = 
schemaType.nullable), nullable = false)
+
+      case HoodieSchemaType.MAP =>
+        val valueSchema = hoodieSchema.getValueType
+        val schemaType = toSqlTypeHelper(valueSchema, existingRecordNames)
+        SchemaType(MapType(StringType, schemaType.dataType, valueContainsNull 
= schemaType.nullable), nullable = false)
+
+      case HoodieSchemaType.UNION =>
+        if (hoodieSchema.isNullable) {
+          // Union with null - extract non-null type and mark as nullable
+          val types = hoodieSchema.getTypes.asScala
+          val remainingTypes = types.filter(_.getType != HoodieSchemaType.NULL)
+          if (remainingTypes.size == 1) {
+            toSqlTypeHelper(remainingTypes.head, 
existingRecordNames).copy(nullable = true)
+          } else {
+            toSqlTypeHelper(HoodieSchema.createUnion(remainingTypes.asJava), 
existingRecordNames)
+              .copy(nullable = true)
+          }
+        } else {
+          // Union without null - handle type promotions and member structs
+          val types = hoodieSchema.getTypes.asScala
+          types.map(_.getType).toSeq match {
+            case Seq(t) =>
+              toSqlTypeHelper(types.head, existingRecordNames)
+            case Seq(t1, t2) if Set(t1, t2) == Set(HoodieSchemaType.INT, 
HoodieSchemaType.LONG) =>
+              SchemaType(LongType, nullable = false)
+            case Seq(t1, t2) if Set(t1, t2) == Set(HoodieSchemaType.FLOAT, 
HoodieSchemaType.DOUBLE) =>
+              SchemaType(DoubleType, nullable = false)
+            case _ =>
+              // Convert to struct with member0, member1, ... fields (like 
Avro union handling)
+              val fields = types.zipWithIndex.map {
+                case (s, i) =>
+                  val schemaType = toSqlTypeHelper(s, existingRecordNames)
+                  StructField(s"member$i", schemaType.dataType, nullable = 
true)
+              }
+              SchemaType(StructType(fields.toSeq), nullable = false)
+          }
+        }
+
+      case other => throw new IncompatibleSchemaException(s"Unsupported 
HoodieSchemaType: $other")
+    }
+  }
+
+  private def canBeUnion(st: StructType): Boolean = {
+    st.fields.length > 0 &&
+      st.forall { f =>
+        f.name.matches("member\\d+") && f.nullable
+      }
+  }
+}
+
+private[avro] class IncompatibleSchemaException(msg: String, ex: Throwable = 
null) extends Exception(msg, ex)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
new file mode 100644
index 000000000000..0bda52b13481
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField, 
HoodieSchemaType}
+import org.apache.hudi.internal.schema.HoodieSchemaException
+
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.scalatest.{FunSuite, Matchers}
+
+import java.util.Arrays
+
+class TestHoodieSchemaConversionUtils extends FunSuite with Matchers {
+
+  test("test all primitive types conversion") {
+    val struct = new StructType()
+      .add("bool_field", BooleanType, false)
+      .add("byte_field", ByteType, false)
+      .add("short_field", ShortType, false)
+      .add("int_field", IntegerType, false)
+      .add("long_field", LongType, false)
+      .add("float_field", FloatType, false)
+      .add("double_field", DoubleType, false)
+      .add("string_field", StringType, false)
+      .add("binary_field", BinaryType, false)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "PrimitiveTypes", "test")
+
+    // Verify all primitive type conversions
+    assert(hoodieSchema.getField("bool_field").get().schema().getType == 
HoodieSchemaType.BOOLEAN)
+    assert(hoodieSchema.getField("byte_field").get().schema().getType == 
HoodieSchemaType.INT)
+    assert(hoodieSchema.getField("short_field").get().schema().getType == 
HoodieSchemaType.INT)
+    assert(hoodieSchema.getField("int_field").get().schema().getType == 
HoodieSchemaType.INT)
+    assert(hoodieSchema.getField("long_field").get().schema().getType == 
HoodieSchemaType.LONG)
+    assert(hoodieSchema.getField("float_field").get().schema().getType == 
HoodieSchemaType.FLOAT)
+    assert(hoodieSchema.getField("double_field").get().schema().getType == 
HoodieSchemaType.DOUBLE)
+    assert(hoodieSchema.getField("string_field").get().schema().getType == 
HoodieSchemaType.STRING)
+    assert(hoodieSchema.getField("binary_field").get().schema().getType == 
HoodieSchemaType.BYTES)
+
+    // Verify roundtrip
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+    assert(convertedStruct.fields.length == 9)
+    assert(convertedStruct.fields(0).dataType == BooleanType)
+    assert(convertedStruct.fields(1).dataType == IntegerType) // Byte → Int
+    assert(convertedStruct.fields(2).dataType == IntegerType) // Short → Int
+    assert(convertedStruct.fields(3).dataType == IntegerType)
+    assert(convertedStruct.fields(4).dataType == LongType)
+    assert(convertedStruct.fields(5).dataType == FloatType)
+    assert(convertedStruct.fields(6).dataType == DoubleType)
+    assert(convertedStruct.fields(7).dataType == StringType)
+    assert(convertedStruct.fields(8).dataType == BinaryType)
+  }
+
+  test("test HoodieSchema to Spark conversion for all primitive types and 
enum") {
+    // Create HoodieSchema with all primitive types and enum
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("bool", 
HoodieSchema.create(HoodieSchemaType.BOOLEAN)),
+      HoodieSchemaField.of("int", HoodieSchema.create(HoodieSchemaType.INT)),
+      HoodieSchemaField.of("long", HoodieSchema.create(HoodieSchemaType.LONG)),
+      HoodieSchemaField.of("float", 
HoodieSchema.create(HoodieSchemaType.FLOAT)),
+      HoodieSchemaField.of("double", 
HoodieSchema.create(HoodieSchemaType.DOUBLE)),
+      HoodieSchemaField.of("string", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+      HoodieSchemaField.of("bytes", 
HoodieSchema.create(HoodieSchemaType.BYTES)),
+      HoodieSchemaField.of("fixed", HoodieSchema.createFixed("MD5", 
"com.example", "MD5 hash", 16)),
+      HoodieSchemaField.of("enum", HoodieSchema.createEnum("Color", 
"com.example", "Color enum", Arrays.asList("RED", "GREEN", "BLUE"))),
+      HoodieSchemaField.of("null", HoodieSchema.create(HoodieSchemaType.NULL))
+    )
+    val hoodieSchema = HoodieSchema.createRecord("AllPrimitives", "test", 
null, fields)
+
+    val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    assert(structType.fields.length == 10)
+    assert(structType.fields(0).dataType == BooleanType)
+    assert(structType.fields(1).dataType == IntegerType)
+    assert(structType.fields(2).dataType == LongType)
+    assert(structType.fields(3).dataType == FloatType)
+    assert(structType.fields(4).dataType == DoubleType)
+    assert(structType.fields(5).dataType == StringType)
+    assert(structType.fields(6).dataType == BinaryType)
+    assert(structType.fields(7).dataType == BinaryType)
+    assert(structType.fields(8).dataType == StringType)
+    assert(structType.fields(9).dataType == NullType)
+    assert(structType.fields(9).nullable) // Null type is always nullable
+  }
+
+  test("test logical types conversion - date, timestamp, decimal") {
+    val struct = new StructType()
+      .add("date_field", DateType, false)
+      .add("timestamp_field", TimestampType, true)
+      .add("decimal_field", DecimalType(10, 2), false)
+      .add("decimal_field2", DecimalType(20, 5), true)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "LogicalTypes", "test")
+
+    // Verify DATE logical type
+    val dateField = hoodieSchema.getField("date_field").get()
+    assert(dateField.schema().getType == HoodieSchemaType.DATE)
+    assert(!dateField.isNullable())
+
+    // Verify TIMESTAMP logical type
+    val timestampField = hoodieSchema.getField("timestamp_field").get()
+    assert(timestampField.isNullable())
+    val timestampSchema = timestampField.schema().getNonNullType()
+    assert(timestampSchema.getType == HoodieSchemaType.TIMESTAMP)
+    assert(timestampSchema.isInstanceOf[HoodieSchema.Timestamp])
+    assert(timestampSchema.asInstanceOf[HoodieSchema.Timestamp].isUtcAdjusted)
+
+    // Verify DECIMAL logical type
+    val decimalField = hoodieSchema.getField("decimal_field").get()
+    assert(decimalField.schema().getType == HoodieSchemaType.DECIMAL)
+    assert(decimalField.schema().isInstanceOf[HoodieSchema.Decimal])
+    val decimalSchema = 
decimalField.schema().asInstanceOf[HoodieSchema.Decimal]
+    assert(decimalSchema.getPrecision == 10)
+    assert(decimalSchema.getScale == 2)
+
+    // Verify roundtrip preserves logical types
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+    assert(convertedStruct.fields(0).dataType == DateType)
+    assert(convertedStruct.fields(1).dataType == TimestampType)
+    assert(convertedStruct.fields(2).dataType == DecimalType(10, 2))
+    assert(convertedStruct.fields(3).dataType == DecimalType(20, 5))
+  }
+
+  test("test HoodieSchema to Spark conversion for logical types") {
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("date", HoodieSchema.createDate()),
+      HoodieSchemaField.of("timestamp_micros", 
HoodieSchema.createTimestampMicros()),
+      HoodieSchemaField.of("timestamp_ntz", 
HoodieSchema.createLocalTimestampMicros()),
+      HoodieSchemaField.of("decimal", HoodieSchema.createDecimal(15, 3)),
+      HoodieSchemaField.of("time_millis", HoodieSchema.createTimeMillis()),
+      HoodieSchemaField.of("time_micros", HoodieSchema.createTimeMicros()),
+      HoodieSchemaField.of("uuid", HoodieSchema.createUUID())
+    )
+    val hoodieSchema = HoodieSchema.createRecord("LogicalTypes", "test", null, 
fields)
+
+    val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    assert(structType.fields.length == 7)
+    assert(structType.fields(0).dataType == DateType)
+    assert(structType.fields(1).dataType == TimestampType)
+    assert(structType.fields(3).dataType == DecimalType(15, 3))
+    assert(structType.fields(4).dataType == IntegerType)  // time_millis -> INT
+    assert(structType.fields(5).dataType == LongType)     // time_micros -> 
LONG
+    assert(structType.fields(6).dataType == StringType)   // uuid -> STRING
+  }
+
+  test("test binary type handling") {
+    val struct = new StructType()
+      .add("binary_field", BinaryType, false)
+      .add("nullable_binary", BinaryType, true)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "BinaryTypes", "test")
+
+    val binaryField = hoodieSchema.getField("binary_field").get()
+    assert(binaryField.schema().getType == HoodieSchemaType.BYTES)
+    assert(!binaryField.isNullable())
+
+    val nullableBinaryField = hoodieSchema.getField("nullable_binary").get()
+    assert(nullableBinaryField.isNullable())
+
+    // Verify roundtrip
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+    assert(convertedStruct.fields(0).dataType == BinaryType)
+    assert(!convertedStruct.fields(0).nullable)
+    assert(convertedStruct.fields(1).dataType == BinaryType)
+    assert(convertedStruct.fields(1).nullable)
+  }
+
+  test("test CharType and VarcharType conversion to STRING") {
+    val struct = new StructType()
+      .add("char_field", CharType(10), false)
+      .add("varchar_field", VarcharType(255), false)
+      .add("string_field", StringType, false)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "CharTypes", "test")
+
+    // All should map to STRING
+    assert(hoodieSchema.getField("char_field").get().schema().getType == 
HoodieSchemaType.STRING)
+    assert(hoodieSchema.getField("varchar_field").get().schema().getType == 
HoodieSchemaType.STRING)
+    assert(hoodieSchema.getField("string_field").get().schema().getType == 
HoodieSchemaType.STRING)
+  }
+
+  test("test SchemaType enum values for logical types") {
+    // Verify that DATE, TIMESTAMP, DECIMAL are properly recognized as 
distinct types
+    val dateSchema = HoodieSchema.createDate()
+    assertEquals(dateSchema.getType, HoodieSchemaType.DATE)
+
+    val timestampSchema = HoodieSchema.createTimestampMicros()
+    assertEquals(timestampSchema.getType, HoodieSchemaType.TIMESTAMP)
+
+    val decimalSchema = HoodieSchema.createDecimal(10, 2)
+    assertEquals(decimalSchema.getType, HoodieSchemaType.DECIMAL)
+
+    // Verify conversion to Spark types
+    val dateType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(dateSchema)
+    assertEquals(dateType, DateType)
+
+    val timestampType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(timestampSchema)
+    assertEquals(timestampType, TimestampType)
+
+    val decimalType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(decimalSchema)
+    assertEquals(decimalType, DecimalType(10, 2))
+  }
+
+  test("test conversion error handling with duplicate field names") {
+    val invalidStruct = new StructType()
+      .add("field1", "string", false)
+      .add("field1", "int", false)  // Duplicate field name
+
+    the[HoodieSchemaException] thrownBy {
+      HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+        invalidStruct, "InvalidSchema", "test")
+    }
+  }
+
+  test("test empty namespace handling") {
+    val struct = new StructType().add("field", "string", false)
+
+    // Convert with empty namespace
+    val hoodieSchema1 = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "Test", "")
+
+    assert(hoodieSchema1.getName() == "Test")
+    assert(!hoodieSchema1.getNamespace().isPresent || 
hoodieSchema1.getNamespace().get() == "")
+
+    // Convert with no namespace just qualifiedName
+    val hoodieSchema2 = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "SimpleTest")
+
+    assert(hoodieSchema2.getName() == "SimpleTest")
+  }
+
+  test("test qualified name parsing") {
+    val struct = new StructType().add("field", "string", false)
+
+    // Test multi-part qualified name
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "com.example.database.Table")
+
+    assert(hoodieSchema.getName() == "Table")
+    assert(hoodieSchema.getNamespace().get() == "com.example.database")
+  }
+
+  test("test field with no comment preserves existing doc") {
+    val struct = new StructType()
+      .add("field_with_comment", "string", false, "User provided comment")
+      .add("field_without_comment", "int", false)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "Comments", "test")
+
+    val field1 = hoodieSchema.getField("field_with_comment").get()
+    assert(field1.doc().get() == "User provided comment")
+
+    val field2 = hoodieSchema.getField("field_without_comment").get()
+    assert(!field2.doc().isPresent)
+  }
+
+  test("test convertHoodieSchemaToStructType using hoodie schema field") {
+    val innerFields = java.util.Arrays.asList(
+      HoodieSchemaField.of("innerKey", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+      HoodieSchemaField.of("value", 
HoodieSchema.createNullable(HoodieSchemaType.LONG))
+    )
+    val innerRecord = HoodieSchema.createRecord("InnerRecord", "test", "Test 
inner record", innerFields)
+
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("key", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+      HoodieSchemaField.of("nested", innerRecord)
+    )
+    val hoodieSchema = HoodieSchema.createRecord("TestRecord", "test", "Test 
record", fields)
+
+    // Convert to Spark StructType
+    val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    // Verify structure
+    assert(structType.fields.length == 2)
+    assert(structType.fields(0).name == "key")
+    assert(structType.fields(0).dataType == StringType)
+    assert(!structType.fields(0).nullable)
+
+    assert(structType.fields(1).name == "nested")
+    assert(structType.fields(1).dataType.isInstanceOf[StructType])
+
+    val nestedStruct = structType.fields(1).dataType.asInstanceOf[StructType]
+    assert(nestedStruct.fields.length == 2)
+    assert(nestedStruct.fields(0).name == "innerKey")
+    assert(nestedStruct.fields(0).dataType == StringType)
+    assert(!nestedStruct.fields(0).nullable)
+
+    assert(nestedStruct.fields(1).name == "value")
+    assert(nestedStruct.fields(1).dataType == LongType)
+    assert(nestedStruct.fields(1).nullable)
+  }
+
+  test("test roundtrip conversion preserves schema structure") {
+    val originalStruct = new StructType()
+      .add("id", "long", false)
+      .add("name", "string", true)
+      .add("scores", ArrayType(IntegerType, containsNull = true), false)
+      .add("metadata", MapType(StringType, StringType, valueContainsNull = 
true), true)
+      .add("timestamp", TimestampType, false)
+      .add("date", DateType, true)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      originalStruct, "TestSchema", "test.namespace")
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+    // Should be equivalent (comparing field names, types, and nullability)
+    assert(originalStruct.fields.length == convertedStruct.fields.length)
+    originalStruct.fields.zip(convertedStruct.fields).foreach { case (orig, 
converted) =>
+      assert(orig.name == converted.name, s"Field name mismatch: ${orig.name} 
vs ${converted.name}")
+      assert(orig.dataType == converted.dataType, s"Field ${orig.name} type 
mismatch: ${orig.dataType} vs ${converted.dataType}")
+      assert(orig.nullable == converted.nullable, s"Field ${orig.name} 
nullability mismatch: ${orig.nullable} vs ${converted.nullable}")
+    }
+  }
+
+  test("test convertStructTypeToHoodieSchema preserves field comments") {
+    val struct = new StructType()
+      .add("id", "long", false, "Primary identifier")
+      .add("name", "string", true, "User display name")
+      .add("nested", new StructType()
+        .add("field1", "int", false, "Nested field comment")
+        .add("field2", "string", true), false, "Nested struct comment")
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "TestSchema", "test.namespace")
+
+    // Verify comments are preserved
+    val idField = hoodieSchema.getField("id").get()
+    assert(idField.doc().get() == "Primary identifier")
+
+    val nameField = hoodieSchema.getField("name").get()
+    assert(nameField.doc().get() == "User display name")
+
+    val nestedField = hoodieSchema.getField("nested").get()
+    assert(nestedField.doc().get() == "Nested struct comment")
+
+    // Verify nested field comments
+    val nestedSchema = nestedField.schema()
+    val field1 = nestedSchema.getField("field1").get()
+    assert(field1.doc().get() == "Nested field comment")
+  }
+
+  test("test complex types - arrays with nullability") {
+    val struct = new StructType()
+      .add("array_non_null_elements", ArrayType(StringType, containsNull = 
false), false)
+      .add("array_nullable_elements", ArrayType(StringType, containsNull = 
true), false)
+      .add("nullable_array", ArrayType(IntegerType, containsNull = false), 
true)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "ArrayTypes", "test")
+
+    // Verify array nullability handling
+    val field1 = hoodieSchema.getField("array_non_null_elements").get()
+    assert(field1.schema().getType == HoodieSchemaType.ARRAY)
+    assert(!field1.schema().getElementType.isNullable)
+    assert(!field1.isNullable())
+
+    val field2 = hoodieSchema.getField("array_nullable_elements").get()
+    assert(field2.schema().getType == HoodieSchemaType.ARRAY)
+    assert(field2.schema().getElementType.isNullable)
+
+    val field3 = hoodieSchema.getField("nullable_array").get()
+    assert(field3.isNullable())
+
+    // Verify roundtrip
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+    
assert(!convertedStruct.fields(0).dataType.asInstanceOf[ArrayType].containsNull)
+    
assert(convertedStruct.fields(1).dataType.asInstanceOf[ArrayType].containsNull)
+    assert(convertedStruct.fields(2).nullable)
+  }
+
+  test("test complex types - maps with nullability") {
+    val struct = new StructType()
+      .add("map_non_null_values", MapType(StringType, IntegerType, 
valueContainsNull = false), false)
+      .add("map_nullable_values", MapType(StringType, IntegerType, 
valueContainsNull = true), false)
+      .add("nullable_map", MapType(StringType, StringType, valueContainsNull = 
false), true)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "MapTypes", "test")
+
+    // Verify map value nullability
+    val field1 = hoodieSchema.getField("map_non_null_values").get()
+    assert(field1.schema().getType == HoodieSchemaType.MAP)
+    assert(!field1.schema().getValueType.isNullable)
+
+    val field2 = hoodieSchema.getField("map_nullable_values").get()
+    assert(field2.schema().getType == HoodieSchemaType.MAP)
+    assert(field2.schema().getValueType.isNullable)
+
+    val field3 = hoodieSchema.getField("nullable_map").get()
+    assert(field3.isNullable())
+
+    // Verify roundtrip
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+    
assert(!convertedStruct.fields(0).dataType.asInstanceOf[MapType].valueContainsNull)
+    
assert(convertedStruct.fields(1).dataType.asInstanceOf[MapType].valueContainsNull)
+    assert(convertedStruct.fields(2).nullable)
+  }
+
+  test("test arrays of complex types") {
+    val elementStruct = new StructType()
+      .add("id", "int", false)
+      .add("name", "string", true)
+
+    val struct = new StructType()
+      .add("array_of_structs", ArrayType(elementStruct, containsNull = true), 
false)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "ArrayOfStructs", "test")
+
+    val arrayField = hoodieSchema.getField("array_of_structs").get()
+    assert(arrayField.schema().getType == HoodieSchemaType.ARRAY)
+
+    val elementType = arrayField.schema().getElementType
+    assert(elementType.isNullable) // Elements are nullable
+    val elementRecord = elementType.getNonNullType()
+    assert(elementRecord.getType == HoodieSchemaType.RECORD)
+    assert(elementRecord.getFields.size() == 2)
+
+    // Verify roundtrip
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+    val convertedArrayType = 
convertedStruct.fields(0).dataType.asInstanceOf[ArrayType]
+    assert(convertedArrayType.containsNull)
+    assert(convertedArrayType.elementType.isInstanceOf[StructType])
+  }
+
+  test("test maps of complex types") {
+    val valueStruct = new StructType()
+      .add("count", "long", false)
+      .add("metadata", "string", true)
+
+    val struct = new StructType()
+      .add("map_of_structs", MapType(StringType, valueStruct, 
valueContainsNull = true), false)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "MapOfStructs", "test")
+
+    val mapField = hoodieSchema.getField("map_of_structs").get()
+    assert(mapField.schema().getType == HoodieSchemaType.MAP)
+
+    val valueType = mapField.schema().getValueType
+    assert(valueType.isNullable) // Values are nullable
+    val valueRecord = valueType.getNonNullType()
+    assert(valueRecord.getType == HoodieSchemaType.RECORD)
+    assert(valueRecord.getFields.size() == 2)
+
+    // Verify roundtrip
+    val convertedStruct = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+    val convertedMapType = 
convertedStruct.fields(0).dataType.asInstanceOf[MapType]
+    assert(convertedMapType.valueContainsNull)
+    assert(convertedMapType.valueType.isInstanceOf[StructType])
+  }
+
+  test("test namespace hierarchy for nested records") {
+    val level2 = new StructType().add("field2", "string", false)
+    val level1 = new StructType().add("field1", "int", false).add("nested2", 
level2, false)
+    val struct = new StructType().add("field0", "long", false).add("nested1", 
level1, false)
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      struct, "Root", "com.example")
+
+    // Verify namespace hierarchy
+    assert(hoodieSchema.getNamespace().orElse(null) == "com.example")
+    assert(hoodieSchema.getName() == "Root")
+
+    val nested1 = hoodieSchema.getField("nested1").get().schema()
+    assert(nested1.getNamespace().orElse(null) == "com.example.Root")
+
+    val nested2 = nested1.getField("nested2").get().schema()
+    assert(nested2.getNamespace().orElse(null) == "com.example.Root.nested1")
+  }
+
+  test("test alignFieldsNullability with HoodieSchema") {
+    val sourceStruct = new StructType()
+      .add("field1", "string", false)  // Non-nullable in source
+      .add("field2", "int", true)      // Nullable in source
+      .add("nested", new StructType()
+        .add("inner1", "long", false)
+        .add("inner2", "string", true), false)
+
+    // Create HoodieSchema with different nullability
+    val nestedFields = java.util.Arrays.asList(
+      HoodieSchemaField.of("inner1", 
HoodieSchema.createNullable(HoodieSchemaType.LONG)),
+      HoodieSchemaField.of("inner2", 
HoodieSchema.create(HoodieSchemaType.STRING))
+    )
+    val nestedSchema = HoodieSchema.createRecord("nested", "test", null, 
nestedFields)
+
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("field1", 
HoodieSchema.createNullable(HoodieSchemaType.STRING)), // Nullable in target
+      HoodieSchemaField.of("field2", 
HoodieSchema.create(HoodieSchemaType.INT)),            // Non-nullable in target
+      HoodieSchemaField.of("nested", nestedSchema)
+    )
+    val hoodieSchema = HoodieSchema.createRecord("TestSchema", "test", null, 
fields)
+
+    // Align nullability
+    val alignedStruct = 
HoodieSchemaConversionUtils.alignFieldsNullability(sourceStruct, hoodieSchema)
+
+    // Verify alignment (should match HoodieSchema nullability)
+    assert(alignedStruct.fields(0).nullable)  // Aligned to HoodieSchema
+    assert(!alignedStruct.fields(1).nullable) // Aligned to HoodieSchema
+
+    val nestedStruct = 
alignedStruct.fields(2).dataType.asInstanceOf[StructType]
+    assert(nestedStruct.fields(0).nullable)   // Aligned to HoodieSchema
+    assert(!nestedStruct.fields(1).nullable)  // Aligned to HoodieSchema
+  }
+
+  test("test alignFieldsNullability with complex types") {
+    val sourceStruct = new StructType()
+      .add("arrayField", ArrayType(StringType, containsNull = false), false)
+      .add("mapField", MapType(StringType, IntegerType, valueContainsNull = 
false), false)
+
+    // Create HoodieSchema where element/value types are nullable
+    val fields = java.util.Arrays.asList(
+      HoodieSchemaField.of("arrayField", 
HoodieSchema.createArray(HoodieSchema.createNullable(HoodieSchemaType.STRING))),
+      HoodieSchemaField.of("mapField", 
HoodieSchema.createMap(HoodieSchema.createNullable(HoodieSchemaType.INT)))
+    )
+    val hoodieSchema = HoodieSchema.createRecord("ComplexNullability", "test", 
null, fields)
+
+    val alignedStruct = 
HoodieSchemaConversionUtils.alignFieldsNullability(sourceStruct, hoodieSchema)
+
+    // Verify that array element and map value nullability is aligned
+    val arrayType = alignedStruct.fields(0).dataType.asInstanceOf[ArrayType]
+    assert(arrayType.containsNull) // Aligned to nullable elements
+
+    val mapType = alignedStruct.fields(1).dataType.asInstanceOf[MapType]
+    assert(mapType.valueContainsNull) // Aligned to nullable values
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 6ca8b7455e63..dbce843fa936 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, 
DataFrameUtil, Dataset, HoodieUnsafeUtils, HoodieUTF8StringFactory, 
Spark3DataFrameUtil, Spark3HoodieUnsafeUtils, Spark3HoodieUTF8StringFactory, 
SparkSession, SQLContext}
 import 
org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters}
+import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index f897d00d41c6..1d0391aa219c 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, 
DataFrameUtil, ExpressionColumnNodeWrapper, HoodieUnsafeUtils, 
HoodieUTF8StringFactory, Spark4DataFrameUtil, Spark4HoodieUnsafeUtils, 
Spark4HoodieUTF8StringFactory, SparkSession, SQLContext}
 import 
org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters}
+import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog.CatalogTable

Reply via email to