This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a7674fbc2760 feat(schema): Add converter for Spark StructType to
HoodieSchema (#17475)
a7674fbc2760 is described below
commit a7674fbc276088b8380175a389f868a38addce77
Author: Rahil C <[email protected]>
AuthorDate: Wed Dec 10 20:48:00 2025 -0800
feat(schema): Add converter for Spark StructType to HoodieSchema (#17475)
* feat(schema): Add converter for Spark StructType to HoodieSchema
* intial attempt at following spark adapter pattern for hoodie schema
* fix spark 3.4 issue
* fix test
* get spark 4 to compile and work
* Reduce code duplication between all spark versions, follow pattern of
existing avro converter by using private[sql] and spark package
org.apache.spark.sql.avro
* get spark 4 compiling after refactor
* address tim comments
* address tim test comments
* address tim comment
* address ethan minor comment without refactor
* refactor with ethan recommendations
* fix spark4
* add scala doc comment
---
.../apache/hudi/HoodieSchemaConversionUtils.scala | 197 ++++++++
.../sql/avro/HoodieSparkSchemaConverters.scala | 246 +++++++++
.../hudi/TestHoodieSchemaConversionUtils.scala | 550 +++++++++++++++++++++
.../spark/sql/adapter/BaseSpark3Adapter.scala | 2 +-
.../spark/sql/adapter/BaseSpark4Adapter.scala | 2 +-
5 files changed, 995 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
new file mode 100644
index 000000000000..f25c098c683f
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.internal.schema.HoodieSchemaException
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Utilities for converting between HoodieSchema and Spark SQL schemas.
+ *
+ * This object provides high-level conversion methods with utilities for
+ * handling defaults and nullability alignment.
+ */
+object HoodieSchemaConversionUtils {
+
+ /**
+ * Converts HoodieSchema to Catalyst's StructType.
+ *
+ * @param hoodieSchema HoodieSchema to convert
+ * @return Spark StructType corresponding to the HoodieSchema
+ * @throws HoodieSchemaException if conversion fails
+ */
+ def convertHoodieSchemaToStructType(hoodieSchema: HoodieSchema): StructType
= {
+ try {
+ HoodieSparkSchemaConverters.toSqlType(hoodieSchema) match {
+ case (dataType, _) => dataType.asInstanceOf[StructType]
+ }
+ } catch {
+ case e: Exception => throw new HoodieSchemaException(
+ s"Failed to convert HoodieSchema to StructType: $hoodieSchema", e)
+ }
+ }
+
+ /**
+ * Converts HoodieSchema to Catalyst's DataType (general purpose, not just
StructType).
+ *
+ * @param hoodieSchema HoodieSchema to convert
+ * @return Spark DataType corresponding to the HoodieSchema
+ * @throws HoodieSchemaException if conversion fails
+ */
+ def convertHoodieSchemaToDataType(hoodieSchema: HoodieSchema): DataType = {
+ try {
+ HoodieSparkSchemaConverters.toSqlType(hoodieSchema) match {
+ case (dataType, _) => dataType
+ }
+ } catch {
+ case e: Exception => throw new HoodieSchemaException(
+ s"Failed to convert HoodieSchema to DataType: $hoodieSchema", e)
+ }
+ }
+
+ /**
+ * Converts StructType to HoodieSchema.
+ *
+ * @param structType Catalyst's StructType or DataType
+ * @param qualifiedName HoodieSchema qualified name (namespace.name format)
+ * @return HoodieSchema corresponding to the Spark DataType
+ * @throws HoodieSchemaException if conversion fails
+ */
+ def convertStructTypeToHoodieSchema(structType: DataType, qualifiedName:
String): HoodieSchema = {
+ val (namespace, name) = {
+ val parts = qualifiedName.split('.')
+ if (parts.length > 1) {
+ (parts.init.mkString("."), parts.last)
+ } else {
+ ("", parts.head)
+ }
+ }
+ convertStructTypeToHoodieSchema(structType, name, namespace)
+ }
+
+ /**
+ * Converts StructType to HoodieSchema.
+ *
+ * @param structType Catalyst's StructType or DataType
+ * @param structName Schema record name
+ * @param recordNamespace Schema record namespace
+ * @return HoodieSchema corresponding to the Spark DataType
+ * @throws HoodieSchemaException if conversion fails
+ */
+ def convertStructTypeToHoodieSchema(structType: DataType,
+ structName: String,
+ recordNamespace: String): HoodieSchema =
{
+ try {
+ HoodieSparkSchemaConverters.toHoodieType(structType, nullable = false,
structName, recordNamespace)
+ } catch {
+ case e: Exception => throw new HoodieSchemaException(
+ s"Failed to convert struct type to HoodieSchema: $structType", e)
+ }
+ }
+
+ /**
+ * Recursively aligns the nullable property of Spark schema fields with
HoodieSchema.
+ *
+ * @param sourceSchema Source Spark StructType to align
+ * @param hoodieSchema HoodieSchema to use as source of truth
+ * @return StructType with aligned nullability
+ */
+ def alignFieldsNullability(sourceSchema: StructType, hoodieSchema:
HoodieSchema): StructType = {
+ val hoodieFieldsMap = hoodieSchema.getFields.asScala.map(f => (f.name(),
f)).toMap
+
+ val alignedFields = sourceSchema.fields.map { field =>
+ hoodieFieldsMap.get(field.name) match {
+ case Some(hoodieField) =>
+ val alignedField = field.copy(nullable = hoodieField.isNullable)
+
+ field.dataType match {
+ case structType: StructType =>
+ val nestedSchema = hoodieField.schema().getNonNullType
+ if (nestedSchema.getType == HoodieSchemaType.RECORD) {
+ alignedField.copy(dataType =
alignFieldsNullability(structType, nestedSchema))
+ } else {
+ alignedField
+ }
+
+ case ArrayType(elementType, _) =>
+ val arraySchema = hoodieField.schema().getNonNullType
+ if (arraySchema.getType == HoodieSchemaType.ARRAY) {
+ val elemSchema = arraySchema.getElementType
+ val newElementType = updateElementType(elementType, elemSchema)
+ alignedField.copy(dataType = ArrayType(newElementType,
elemSchema.isNullable))
+ } else {
+ alignedField
+ }
+
+ case MapType(keyType, valueType, _) =>
+ val mapSchema = hoodieField.schema().getNonNullType
+ if (mapSchema.getType == HoodieSchemaType.MAP) {
+ val valueSchema = mapSchema.getValueType
+ val newValueType = updateElementType(valueType, valueSchema)
+ alignedField.copy(dataType = MapType(keyType, newValueType,
valueSchema.isNullable))
+ } else {
+ alignedField
+ }
+
+ case _ => alignedField
+ }
+
+ case None => field.copy()
+ }
+ }
+
+ StructType(alignedFields)
+ }
+
+ /**
+ * Recursively updates element types for complex types (arrays, maps,
structs).
+ */
+ private def updateElementType(dataType: DataType, hoodieSchema:
HoodieSchema): DataType = {
+ dataType match {
+ case structType: StructType =>
+ if (hoodieSchema.getType == HoodieSchemaType.RECORD) {
+ alignFieldsNullability(structType, hoodieSchema)
+ } else {
+ structType
+ }
+
+ case ArrayType(elemType, _) =>
+ if (hoodieSchema.getType == HoodieSchemaType.ARRAY) {
+ val elemSchema = hoodieSchema.getElementType
+ ArrayType(updateElementType(elemType, elemSchema),
elemSchema.isNullable)
+ } else {
+ dataType
+ }
+
+ case MapType(keyType, valueType, _) =>
+ if (hoodieSchema.getType == HoodieSchemaType.MAP) {
+ val valueSchema = hoodieSchema.getValueType
+ MapType(keyType, updateElementType(valueType, valueSchema),
valueSchema.isNullable)
+ } else {
+ dataType
+ }
+
+ case _ => dataType
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
new file mode 100644
index 000000000000..a7c41cfa2517
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField,
HoodieSchemaType}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.types.Decimal.minBytesForPrecision
+import org.apache.spark.sql.types._
+
+import scala.collection.JavaConverters._
+
+/**
+ * This object contains methods that are used to convert HoodieSchema to Spark
SQL schemas and vice versa.
+ *
+ * This provides direct conversion between HoodieSchema and Spark DataType
+ * without going through Avro Schema intermediary.
+ *
+ * NOTE: the package of this class is intentionally kept as
"org.apache.spark.sql.avro" which is similar to the existing
+ * Spark Avro connector's SchemaConverters.scala
+ *
(https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala).
+ * The reason for this is so that Spark 3.3 is able to access private spark
sql type classes like TimestampNTZType.
+ */
+
+@DeveloperApi
+object HoodieSparkSchemaConverters {
+
+ /**
+ * Internal wrapper for SQL data type and nullability.
+ */
+ case class SchemaType(dataType: DataType, nullable: Boolean)
+
+ def toSqlType(hoodieSchema: HoodieSchema): (DataType, Boolean) = {
+ val result = toSqlTypeHelper(hoodieSchema, Set.empty)
+ (result.dataType, result.nullable)
+ }
+
+ def toHoodieType(catalystType: DataType, nullable: Boolean, recordName:
String, nameSpace: String): HoodieSchema = {
+ val schema = catalystType match {
+ // Primitive types
+ case BooleanType => HoodieSchema.create(HoodieSchemaType.BOOLEAN)
+ case ByteType | ShortType | IntegerType =>
HoodieSchema.create(HoodieSchemaType.INT)
+ case LongType => HoodieSchema.create(HoodieSchemaType.LONG)
+ case DateType => HoodieSchema.createDate()
+ case TimestampType => HoodieSchema.createTimestampMicros()
+ case TimestampNTZType => HoodieSchema.createLocalTimestampMicros()
+ case FloatType => HoodieSchema.create(HoodieSchemaType.FLOAT)
+ case DoubleType => HoodieSchema.create(HoodieSchemaType.DOUBLE)
+ case StringType | _: CharType | _: VarcharType =>
HoodieSchema.create(HoodieSchemaType.STRING)
+ case NullType => HoodieSchema.create(HoodieSchemaType.NULL)
+ case BinaryType => HoodieSchema.create(HoodieSchemaType.BYTES)
+
+ case d: DecimalType =>
+ val fixedSize = minBytesForPrecision(d.precision)
+ val name = nameSpace match {
+ case "" => s"$recordName.fixed"
+ case _ => s"$nameSpace.$recordName.fixed"
+ }
+ HoodieSchema.createDecimal(name, nameSpace, null, d.precision,
d.scale, fixedSize)
+
+ // Complex types
+ case ArrayType(elementType, containsNull) =>
+ val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace)
+ HoodieSchema.createArray(elementSchema)
+
+ case MapType(StringType, valueType, valueContainsNull) =>
+ val valueSchema = toHoodieType(valueType, valueContainsNull,
recordName, nameSpace)
+ HoodieSchema.createMap(valueSchema)
+
+ case st: StructType =>
+ val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName"
else recordName
+
+ // Check if this might be a union (using heuristic like Avro converter)
+ if (canBeUnion(st)) {
+ val nonNullUnionFieldTypes = st.map { f =>
+ toHoodieType(f.dataType, nullable = false, f.name, childNameSpace)
+ }
+ val unionFieldTypes = if (nullable) {
+ (HoodieSchema.create(HoodieSchemaType.NULL) +:
nonNullUnionFieldTypes).asJava
+ } else {
+ nonNullUnionFieldTypes.asJava
+ }
+ HoodieSchema.createUnion(unionFieldTypes)
+ } else {
+ // Create record
+ val fields = st.map { f =>
+ val fieldSchema = toHoodieType(f.dataType, f.nullable, f.name,
childNameSpace)
+ val doc = f.getComment.orNull
+ HoodieSchemaField.of(f.name, fieldSchema, doc)
+ }
+
+ HoodieSchema.createRecord(recordName, nameSpace, null, fields.asJava)
+ }
+
+ case other => throw new IncompatibleSchemaException(s"Unexpected Spark
DataType: $other")
+ }
+
+ // Wrap with null union if nullable (and not already a union)
+ if (nullable && catalystType != NullType && schema.getType !=
HoodieSchemaType.UNION) {
+ HoodieSchema.createNullable(schema)
+ } else {
+ schema
+ }
+ }
+
+ private def toSqlTypeHelper(hoodieSchema: HoodieSchema, existingRecordNames:
Set[String]): SchemaType = {
+ hoodieSchema.getType match {
+ // Primitive types
+ case HoodieSchemaType.INT => SchemaType(IntegerType, nullable = false)
+ case HoodieSchemaType.STRING | HoodieSchemaType.ENUM =>
SchemaType(StringType, nullable = false)
+ case HoodieSchemaType.BOOLEAN => SchemaType(BooleanType, nullable =
false)
+ case HoodieSchemaType.BYTES | HoodieSchemaType.FIXED =>
SchemaType(BinaryType, nullable = false)
+ case HoodieSchemaType.DOUBLE => SchemaType(DoubleType, nullable = false)
+ case HoodieSchemaType.FLOAT => SchemaType(FloatType, nullable = false)
+ case HoodieSchemaType.LONG => SchemaType(LongType, nullable = false)
+ case HoodieSchemaType.NULL => SchemaType(NullType, nullable = true)
+
+ // Logical types
+ case HoodieSchemaType.DATE =>
+ SchemaType(DateType, nullable = false)
+
+ case HoodieSchemaType.TIMESTAMP =>
+ hoodieSchema match {
+ case ts: HoodieSchema.Timestamp if !ts.isUtcAdjusted =>
+ SchemaType(TimestampNTZType, nullable = false)
+ case _ =>
+ SchemaType(TimestampType, nullable = false)
+ }
+
+ case HoodieSchemaType.DECIMAL =>
+ hoodieSchema match {
+ case dec: HoodieSchema.Decimal =>
+ SchemaType(DecimalType(dec.getPrecision, dec.getScale), nullable =
false)
+ case _ =>
+ throw new IncompatibleSchemaException(
+ s"DECIMAL type must be HoodieSchema.Decimal instance, got:
${hoodieSchema.getClass}")
+ }
+
+ case HoodieSchemaType.TIME =>
+ hoodieSchema match {
+ case time: HoodieSchema.Time =>
+ time.getPrecision match {
+ case TimePrecision.MILLIS => SchemaType(IntegerType, nullable =
false)
+ case TimePrecision.MICROS => SchemaType(LongType, nullable =
false)
+ }
+ case _ =>
+ throw new IncompatibleSchemaException(
+ s"TIME type must be HoodieSchema.Time instance, got:
${hoodieSchema.getClass}")
+ }
+
+ case HoodieSchemaType.UUID =>
+ SchemaType(StringType, nullable = false)
+
+ // Complex types
+ case HoodieSchemaType.RECORD =>
+ val fullName = hoodieSchema.getFullName
+ if (existingRecordNames.contains(fullName)) {
+ throw new IncompatibleSchemaException(
+ s"""
+ |Found recursive reference in HoodieSchema, which cannot be
processed by Spark:
+ |$fullName
+ """.stripMargin)
+ }
+ val newRecordNames = existingRecordNames + fullName
+ val fields = hoodieSchema.getFields.asScala.map { f =>
+ val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
+ val metadata = if (f.doc().isPresent) {
+ new MetadataBuilder().putString("comment", f.doc().get()).build()
+ } else {
+ Metadata.empty
+ }
+ StructField(f.name(), schemaType.dataType, schemaType.nullable,
metadata)
+ }
+ SchemaType(StructType(fields.toSeq), nullable = false)
+
+ case HoodieSchemaType.ARRAY =>
+ val elementSchema = hoodieSchema.getElementType
+ val schemaType = toSqlTypeHelper(elementSchema, existingRecordNames)
+ SchemaType(ArrayType(schemaType.dataType, containsNull =
schemaType.nullable), nullable = false)
+
+ case HoodieSchemaType.MAP =>
+ val valueSchema = hoodieSchema.getValueType
+ val schemaType = toSqlTypeHelper(valueSchema, existingRecordNames)
+ SchemaType(MapType(StringType, schemaType.dataType, valueContainsNull
= schemaType.nullable), nullable = false)
+
+ case HoodieSchemaType.UNION =>
+ if (hoodieSchema.isNullable) {
+ // Union with null - extract non-null type and mark as nullable
+ val types = hoodieSchema.getTypes.asScala
+ val remainingTypes = types.filter(_.getType != HoodieSchemaType.NULL)
+ if (remainingTypes.size == 1) {
+ toSqlTypeHelper(remainingTypes.head,
existingRecordNames).copy(nullable = true)
+ } else {
+ toSqlTypeHelper(HoodieSchema.createUnion(remainingTypes.asJava),
existingRecordNames)
+ .copy(nullable = true)
+ }
+ } else {
+ // Union without null - handle type promotions and member structs
+ val types = hoodieSchema.getTypes.asScala
+ types.map(_.getType).toSeq match {
+ case Seq(t) =>
+ toSqlTypeHelper(types.head, existingRecordNames)
+ case Seq(t1, t2) if Set(t1, t2) == Set(HoodieSchemaType.INT,
HoodieSchemaType.LONG) =>
+ SchemaType(LongType, nullable = false)
+ case Seq(t1, t2) if Set(t1, t2) == Set(HoodieSchemaType.FLOAT,
HoodieSchemaType.DOUBLE) =>
+ SchemaType(DoubleType, nullable = false)
+ case _ =>
+ // Convert to struct with member0, member1, ... fields (like
Avro union handling)
+ val fields = types.zipWithIndex.map {
+ case (s, i) =>
+ val schemaType = toSqlTypeHelper(s, existingRecordNames)
+ StructField(s"member$i", schemaType.dataType, nullable =
true)
+ }
+ SchemaType(StructType(fields.toSeq), nullable = false)
+ }
+ }
+
+ case other => throw new IncompatibleSchemaException(s"Unsupported
HoodieSchemaType: $other")
+ }
+ }
+
+ private def canBeUnion(st: StructType): Boolean = {
+ st.fields.length > 0 &&
+ st.forall { f =>
+ f.name.matches("member\\d+") && f.nullable
+ }
+ }
+}
+
+private[avro] class IncompatibleSchemaException(msg: String, ex: Throwable =
null) extends Exception(msg, ex)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
new file mode 100644
index 000000000000..0bda52b13481
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField,
HoodieSchemaType}
+import org.apache.hudi.internal.schema.HoodieSchemaException
+
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.scalatest.{FunSuite, Matchers}
+
+import java.util.Arrays
+
+class TestHoodieSchemaConversionUtils extends FunSuite with Matchers {
+
+ test("test all primitive types conversion") {
+ val struct = new StructType()
+ .add("bool_field", BooleanType, false)
+ .add("byte_field", ByteType, false)
+ .add("short_field", ShortType, false)
+ .add("int_field", IntegerType, false)
+ .add("long_field", LongType, false)
+ .add("float_field", FloatType, false)
+ .add("double_field", DoubleType, false)
+ .add("string_field", StringType, false)
+ .add("binary_field", BinaryType, false)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "PrimitiveTypes", "test")
+
+ // Verify all primitive type conversions
+ assert(hoodieSchema.getField("bool_field").get().schema().getType ==
HoodieSchemaType.BOOLEAN)
+ assert(hoodieSchema.getField("byte_field").get().schema().getType ==
HoodieSchemaType.INT)
+ assert(hoodieSchema.getField("short_field").get().schema().getType ==
HoodieSchemaType.INT)
+ assert(hoodieSchema.getField("int_field").get().schema().getType ==
HoodieSchemaType.INT)
+ assert(hoodieSchema.getField("long_field").get().schema().getType ==
HoodieSchemaType.LONG)
+ assert(hoodieSchema.getField("float_field").get().schema().getType ==
HoodieSchemaType.FLOAT)
+ assert(hoodieSchema.getField("double_field").get().schema().getType ==
HoodieSchemaType.DOUBLE)
+ assert(hoodieSchema.getField("string_field").get().schema().getType ==
HoodieSchemaType.STRING)
+ assert(hoodieSchema.getField("binary_field").get().schema().getType ==
HoodieSchemaType.BYTES)
+
+ // Verify roundtrip
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+ assert(convertedStruct.fields.length == 9)
+ assert(convertedStruct.fields(0).dataType == BooleanType)
+ assert(convertedStruct.fields(1).dataType == IntegerType) // Byte → Int
+ assert(convertedStruct.fields(2).dataType == IntegerType) // Short → Int
+ assert(convertedStruct.fields(3).dataType == IntegerType)
+ assert(convertedStruct.fields(4).dataType == LongType)
+ assert(convertedStruct.fields(5).dataType == FloatType)
+ assert(convertedStruct.fields(6).dataType == DoubleType)
+ assert(convertedStruct.fields(7).dataType == StringType)
+ assert(convertedStruct.fields(8).dataType == BinaryType)
+ }
+
+ test("test HoodieSchema to Spark conversion for all primitive types and
enum") {
+ // Create HoodieSchema with all primitive types and enum
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("bool",
HoodieSchema.create(HoodieSchemaType.BOOLEAN)),
+ HoodieSchemaField.of("int", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("long", HoodieSchema.create(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("float",
HoodieSchema.create(HoodieSchemaType.FLOAT)),
+ HoodieSchemaField.of("double",
HoodieSchema.create(HoodieSchemaType.DOUBLE)),
+ HoodieSchemaField.of("string",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("bytes",
HoodieSchema.create(HoodieSchemaType.BYTES)),
+ HoodieSchemaField.of("fixed", HoodieSchema.createFixed("MD5",
"com.example", "MD5 hash", 16)),
+ HoodieSchemaField.of("enum", HoodieSchema.createEnum("Color",
"com.example", "Color enum", Arrays.asList("RED", "GREEN", "BLUE"))),
+ HoodieSchemaField.of("null", HoodieSchema.create(HoodieSchemaType.NULL))
+ )
+ val hoodieSchema = HoodieSchema.createRecord("AllPrimitives", "test",
null, fields)
+
+ val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ assert(structType.fields.length == 10)
+ assert(structType.fields(0).dataType == BooleanType)
+ assert(structType.fields(1).dataType == IntegerType)
+ assert(structType.fields(2).dataType == LongType)
+ assert(structType.fields(3).dataType == FloatType)
+ assert(structType.fields(4).dataType == DoubleType)
+ assert(structType.fields(5).dataType == StringType)
+ assert(structType.fields(6).dataType == BinaryType)
+ assert(structType.fields(7).dataType == BinaryType)
+ assert(structType.fields(8).dataType == StringType)
+ assert(structType.fields(9).dataType == NullType)
+ assert(structType.fields(9).nullable) // Null type is always nullable
+ }
+
+ test("test logical types conversion - date, timestamp, decimal") {
+ val struct = new StructType()
+ .add("date_field", DateType, false)
+ .add("timestamp_field", TimestampType, true)
+ .add("decimal_field", DecimalType(10, 2), false)
+ .add("decimal_field2", DecimalType(20, 5), true)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "LogicalTypes", "test")
+
+ // Verify DATE logical type
+ val dateField = hoodieSchema.getField("date_field").get()
+ assert(dateField.schema().getType == HoodieSchemaType.DATE)
+ assert(!dateField.isNullable())
+
+ // Verify TIMESTAMP logical type
+ val timestampField = hoodieSchema.getField("timestamp_field").get()
+ assert(timestampField.isNullable())
+ val timestampSchema = timestampField.schema().getNonNullType()
+ assert(timestampSchema.getType == HoodieSchemaType.TIMESTAMP)
+ assert(timestampSchema.isInstanceOf[HoodieSchema.Timestamp])
+ assert(timestampSchema.asInstanceOf[HoodieSchema.Timestamp].isUtcAdjusted)
+
+ // Verify DECIMAL logical type
+ val decimalField = hoodieSchema.getField("decimal_field").get()
+ assert(decimalField.schema().getType == HoodieSchemaType.DECIMAL)
+ assert(decimalField.schema().isInstanceOf[HoodieSchema.Decimal])
+ val decimalSchema =
decimalField.schema().asInstanceOf[HoodieSchema.Decimal]
+ assert(decimalSchema.getPrecision == 10)
+ assert(decimalSchema.getScale == 2)
+
+ // Verify roundtrip preserves logical types
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+ assert(convertedStruct.fields(0).dataType == DateType)
+ assert(convertedStruct.fields(1).dataType == TimestampType)
+ assert(convertedStruct.fields(2).dataType == DecimalType(10, 2))
+ assert(convertedStruct.fields(3).dataType == DecimalType(20, 5))
+ }
+
+ test("test HoodieSchema to Spark conversion for logical types") {
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("date", HoodieSchema.createDate()),
+ HoodieSchemaField.of("timestamp_micros",
HoodieSchema.createTimestampMicros()),
+ HoodieSchemaField.of("timestamp_ntz",
HoodieSchema.createLocalTimestampMicros()),
+ HoodieSchemaField.of("decimal", HoodieSchema.createDecimal(15, 3)),
+ HoodieSchemaField.of("time_millis", HoodieSchema.createTimeMillis()),
+ HoodieSchemaField.of("time_micros", HoodieSchema.createTimeMicros()),
+ HoodieSchemaField.of("uuid", HoodieSchema.createUUID())
+ )
+ val hoodieSchema = HoodieSchema.createRecord("LogicalTypes", "test", null,
fields)
+
+ val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ assert(structType.fields.length == 7)
+ assert(structType.fields(0).dataType == DateType)
+ assert(structType.fields(1).dataType == TimestampType)
+ assert(structType.fields(3).dataType == DecimalType(15, 3))
+ assert(structType.fields(4).dataType == IntegerType) // time_millis -> INT
+ assert(structType.fields(5).dataType == LongType) // time_micros ->
LONG
+ assert(structType.fields(6).dataType == StringType) // uuid -> STRING
+ }
+
+ test("test binary type handling") {
+ val struct = new StructType()
+ .add("binary_field", BinaryType, false)
+ .add("nullable_binary", BinaryType, true)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "BinaryTypes", "test")
+
+ val binaryField = hoodieSchema.getField("binary_field").get()
+ assert(binaryField.schema().getType == HoodieSchemaType.BYTES)
+ assert(!binaryField.isNullable())
+
+ val nullableBinaryField = hoodieSchema.getField("nullable_binary").get()
+ assert(nullableBinaryField.isNullable())
+
+ // Verify roundtrip
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+ assert(convertedStruct.fields(0).dataType == BinaryType)
+ assert(!convertedStruct.fields(0).nullable)
+ assert(convertedStruct.fields(1).dataType == BinaryType)
+ assert(convertedStruct.fields(1).nullable)
+ }
+
+ test("test CharType and VarcharType conversion to STRING") {
+ val struct = new StructType()
+ .add("char_field", CharType(10), false)
+ .add("varchar_field", VarcharType(255), false)
+ .add("string_field", StringType, false)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "CharTypes", "test")
+
+ // All should map to STRING
+ assert(hoodieSchema.getField("char_field").get().schema().getType ==
HoodieSchemaType.STRING)
+ assert(hoodieSchema.getField("varchar_field").get().schema().getType ==
HoodieSchemaType.STRING)
+ assert(hoodieSchema.getField("string_field").get().schema().getType ==
HoodieSchemaType.STRING)
+ }
+
+ test("test SchemaType enum values for logical types") {
+ // Verify that DATE, TIMESTAMP, DECIMAL are properly recognized as
distinct types
+ val dateSchema = HoodieSchema.createDate()
+ assertEquals(dateSchema.getType, HoodieSchemaType.DATE)
+
+ val timestampSchema = HoodieSchema.createTimestampMicros()
+ assertEquals(timestampSchema.getType, HoodieSchemaType.TIMESTAMP)
+
+ val decimalSchema = HoodieSchema.createDecimal(10, 2)
+ assertEquals(decimalSchema.getType, HoodieSchemaType.DECIMAL)
+
+ // Verify conversion to Spark types
+ val dateType =
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(dateSchema)
+ assertEquals(dateType, DateType)
+
+ val timestampType =
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(timestampSchema)
+ assertEquals(timestampType, TimestampType)
+
+ val decimalType =
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(decimalSchema)
+ assertEquals(decimalType, DecimalType(10, 2))
+ }
+
+ test("test conversion error handling with duplicate field names") {
+ val invalidStruct = new StructType()
+ .add("field1", "string", false)
+ .add("field1", "int", false) // Duplicate field name
+
+ the[HoodieSchemaException] thrownBy {
+ HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ invalidStruct, "InvalidSchema", "test")
+ }
+ }
+
+ test("test empty namespace handling") {
+ val struct = new StructType().add("field", "string", false)
+
+ // Convert with empty namespace
+ val hoodieSchema1 =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "Test", "")
+
+ assert(hoodieSchema1.getName() == "Test")
+ assert(!hoodieSchema1.getNamespace().isPresent ||
hoodieSchema1.getNamespace().get() == "")
+
+ // Convert with no namespace just qualifiedName
+ val hoodieSchema2 =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "SimpleTest")
+
+ assert(hoodieSchema2.getName() == "SimpleTest")
+ }
+
+ test("test qualified name parsing") {
+ val struct = new StructType().add("field", "string", false)
+
+ // Test multi-part qualified name
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "com.example.database.Table")
+
+ assert(hoodieSchema.getName() == "Table")
+ assert(hoodieSchema.getNamespace().get() == "com.example.database")
+ }
+
+ test("test field with no comment preserves existing doc") {
+ val struct = new StructType()
+ .add("field_with_comment", "string", false, "User provided comment")
+ .add("field_without_comment", "int", false)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "Comments", "test")
+
+ val field1 = hoodieSchema.getField("field_with_comment").get()
+ assert(field1.doc().get() == "User provided comment")
+
+ val field2 = hoodieSchema.getField("field_without_comment").get()
+ assert(!field2.doc().isPresent)
+ }
+
+ test("test convertHoodieSchemaToStructType using hoodie schema field") {
+ val innerFields = java.util.Arrays.asList(
+ HoodieSchemaField.of("innerKey",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("value",
HoodieSchema.createNullable(HoodieSchemaType.LONG))
+ )
+ val innerRecord = HoodieSchema.createRecord("InnerRecord", "test", "Test
inner record", innerFields)
+
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("key",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("nested", innerRecord)
+ )
+ val hoodieSchema = HoodieSchema.createRecord("TestRecord", "test", "Test
record", fields)
+
+ // Convert to Spark StructType
+ val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ // Verify structure
+ assert(structType.fields.length == 2)
+ assert(structType.fields(0).name == "key")
+ assert(structType.fields(0).dataType == StringType)
+ assert(!structType.fields(0).nullable)
+
+ assert(structType.fields(1).name == "nested")
+ assert(structType.fields(1).dataType.isInstanceOf[StructType])
+
+ val nestedStruct = structType.fields(1).dataType.asInstanceOf[StructType]
+ assert(nestedStruct.fields.length == 2)
+ assert(nestedStruct.fields(0).name == "innerKey")
+ assert(nestedStruct.fields(0).dataType == StringType)
+ assert(!nestedStruct.fields(0).nullable)
+
+ assert(nestedStruct.fields(1).name == "value")
+ assert(nestedStruct.fields(1).dataType == LongType)
+ assert(nestedStruct.fields(1).nullable)
+ }
+
+ test("test roundtrip conversion preserves schema structure") {
+ val originalStruct = new StructType()
+ .add("id", "long", false)
+ .add("name", "string", true)
+ .add("scores", ArrayType(IntegerType, containsNull = true), false)
+ .add("metadata", MapType(StringType, StringType, valueContainsNull =
true), true)
+ .add("timestamp", TimestampType, false)
+ .add("date", DateType, true)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ originalStruct, "TestSchema", "test.namespace")
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
+ // Should be equivalent (comparing field names, types, and nullability)
+ assert(originalStruct.fields.length == convertedStruct.fields.length)
+ originalStruct.fields.zip(convertedStruct.fields).foreach { case (orig,
converted) =>
+ assert(orig.name == converted.name, s"Field name mismatch: ${orig.name}
vs ${converted.name}")
+ assert(orig.dataType == converted.dataType, s"Field ${orig.name} type
mismatch: ${orig.dataType} vs ${converted.dataType}")
+ assert(orig.nullable == converted.nullable, s"Field ${orig.name}
nullability mismatch: ${orig.nullable} vs ${converted.nullable}")
+ }
+ }
+
+ test("test convertStructTypeToHoodieSchema preserves field comments") {
+ val struct = new StructType()
+ .add("id", "long", false, "Primary identifier")
+ .add("name", "string", true, "User display name")
+ .add("nested", new StructType()
+ .add("field1", "int", false, "Nested field comment")
+ .add("field2", "string", true), false, "Nested struct comment")
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "TestSchema", "test.namespace")
+
+ // Verify comments are preserved
+ val idField = hoodieSchema.getField("id").get()
+ assert(idField.doc().get() == "Primary identifier")
+
+ val nameField = hoodieSchema.getField("name").get()
+ assert(nameField.doc().get() == "User display name")
+
+ val nestedField = hoodieSchema.getField("nested").get()
+ assert(nestedField.doc().get() == "Nested struct comment")
+
+ // Verify nested field comments
+ val nestedSchema = nestedField.schema()
+ val field1 = nestedSchema.getField("field1").get()
+ assert(field1.doc().get() == "Nested field comment")
+ }
+
+ test("test complex types - arrays with nullability") {
+ val struct = new StructType()
+ .add("array_non_null_elements", ArrayType(StringType, containsNull =
false), false)
+ .add("array_nullable_elements", ArrayType(StringType, containsNull =
true), false)
+ .add("nullable_array", ArrayType(IntegerType, containsNull = false),
true)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "ArrayTypes", "test")
+
+ // Verify array nullability handling
+ val field1 = hoodieSchema.getField("array_non_null_elements").get()
+ assert(field1.schema().getType == HoodieSchemaType.ARRAY)
+ assert(!field1.schema().getElementType.isNullable)
+ assert(!field1.isNullable())
+
+ val field2 = hoodieSchema.getField("array_nullable_elements").get()
+ assert(field2.schema().getType == HoodieSchemaType.ARRAY)
+ assert(field2.schema().getElementType.isNullable)
+
+ val field3 = hoodieSchema.getField("nullable_array").get()
+ assert(field3.isNullable())
+
+ // Verify roundtrip
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
assert(!convertedStruct.fields(0).dataType.asInstanceOf[ArrayType].containsNull)
+
assert(convertedStruct.fields(1).dataType.asInstanceOf[ArrayType].containsNull)
+ assert(convertedStruct.fields(2).nullable)
+ }
+
+ test("test complex types - maps with nullability") {
+ val struct = new StructType()
+ .add("map_non_null_values", MapType(StringType, IntegerType,
valueContainsNull = false), false)
+ .add("map_nullable_values", MapType(StringType, IntegerType,
valueContainsNull = true), false)
+ .add("nullable_map", MapType(StringType, StringType, valueContainsNull =
false), true)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "MapTypes", "test")
+
+ // Verify map value nullability
+ val field1 = hoodieSchema.getField("map_non_null_values").get()
+ assert(field1.schema().getType == HoodieSchemaType.MAP)
+ assert(!field1.schema().getValueType.isNullable)
+
+ val field2 = hoodieSchema.getField("map_nullable_values").get()
+ assert(field2.schema().getType == HoodieSchemaType.MAP)
+ assert(field2.schema().getValueType.isNullable)
+
+ val field3 = hoodieSchema.getField("nullable_map").get()
+ assert(field3.isNullable())
+
+ // Verify roundtrip
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+
assert(!convertedStruct.fields(0).dataType.asInstanceOf[MapType].valueContainsNull)
+
assert(convertedStruct.fields(1).dataType.asInstanceOf[MapType].valueContainsNull)
+ assert(convertedStruct.fields(2).nullable)
+ }
+
+ test("test arrays of complex types") {
+ val elementStruct = new StructType()
+ .add("id", "int", false)
+ .add("name", "string", true)
+
+ val struct = new StructType()
+ .add("array_of_structs", ArrayType(elementStruct, containsNull = true),
false)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "ArrayOfStructs", "test")
+
+ val arrayField = hoodieSchema.getField("array_of_structs").get()
+ assert(arrayField.schema().getType == HoodieSchemaType.ARRAY)
+
+ val elementType = arrayField.schema().getElementType
+ assert(elementType.isNullable) // Elements are nullable
+ val elementRecord = elementType.getNonNullType()
+ assert(elementRecord.getType == HoodieSchemaType.RECORD)
+ assert(elementRecord.getFields.size() == 2)
+
+ // Verify roundtrip
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+ val convertedArrayType =
convertedStruct.fields(0).dataType.asInstanceOf[ArrayType]
+ assert(convertedArrayType.containsNull)
+ assert(convertedArrayType.elementType.isInstanceOf[StructType])
+ }
+
+ test("test maps of complex types") {
+ val valueStruct = new StructType()
+ .add("count", "long", false)
+ .add("metadata", "string", true)
+
+ val struct = new StructType()
+ .add("map_of_structs", MapType(StringType, valueStruct,
valueContainsNull = true), false)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "MapOfStructs", "test")
+
+ val mapField = hoodieSchema.getField("map_of_structs").get()
+ assert(mapField.schema().getType == HoodieSchemaType.MAP)
+
+ val valueType = mapField.schema().getValueType
+ assert(valueType.isNullable) // Values are nullable
+ val valueRecord = valueType.getNonNullType()
+ assert(valueRecord.getType == HoodieSchemaType.RECORD)
+ assert(valueRecord.getFields.size() == 2)
+
+ // Verify roundtrip
+ val convertedStruct =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
+ val convertedMapType =
convertedStruct.fields(0).dataType.asInstanceOf[MapType]
+ assert(convertedMapType.valueContainsNull)
+ assert(convertedMapType.valueType.isInstanceOf[StructType])
+ }
+
+ test("test namespace hierarchy for nested records") {
+ val level2 = new StructType().add("field2", "string", false)
+ val level1 = new StructType().add("field1", "int", false).add("nested2",
level2, false)
+ val struct = new StructType().add("field0", "long", false).add("nested1",
level1, false)
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ struct, "Root", "com.example")
+
+ // Verify namespace hierarchy
+ assert(hoodieSchema.getNamespace().orElse(null) == "com.example")
+ assert(hoodieSchema.getName() == "Root")
+
+ val nested1 = hoodieSchema.getField("nested1").get().schema()
+ assert(nested1.getNamespace().orElse(null) == "com.example.Root")
+
+ val nested2 = nested1.getField("nested2").get().schema()
+ assert(nested2.getNamespace().orElse(null) == "com.example.Root.nested1")
+ }
+
+ test("test alignFieldsNullability with HoodieSchema") {
+ val sourceStruct = new StructType()
+ .add("field1", "string", false) // Non-nullable in source
+ .add("field2", "int", true) // Nullable in source
+ .add("nested", new StructType()
+ .add("inner1", "long", false)
+ .add("inner2", "string", true), false)
+
+ // Create HoodieSchema with different nullability
+ val nestedFields = java.util.Arrays.asList(
+ HoodieSchemaField.of("inner1",
HoodieSchema.createNullable(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("inner2",
HoodieSchema.create(HoodieSchemaType.STRING))
+ )
+ val nestedSchema = HoodieSchema.createRecord("nested", "test", null,
nestedFields)
+
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("field1",
HoodieSchema.createNullable(HoodieSchemaType.STRING)), // Nullable in target
+ HoodieSchemaField.of("field2",
HoodieSchema.create(HoodieSchemaType.INT)), // Non-nullable in target
+ HoodieSchemaField.of("nested", nestedSchema)
+ )
+ val hoodieSchema = HoodieSchema.createRecord("TestSchema", "test", null,
fields)
+
+ // Align nullability
+ val alignedStruct =
HoodieSchemaConversionUtils.alignFieldsNullability(sourceStruct, hoodieSchema)
+
+ // Verify alignment (should match HoodieSchema nullability)
+ assert(alignedStruct.fields(0).nullable) // Aligned to HoodieSchema
+ assert(!alignedStruct.fields(1).nullable) // Aligned to HoodieSchema
+
+ val nestedStruct =
alignedStruct.fields(2).dataType.asInstanceOf[StructType]
+ assert(nestedStruct.fields(0).nullable) // Aligned to HoodieSchema
+ assert(!nestedStruct.fields(1).nullable) // Aligned to HoodieSchema
+ }
+
+ test("test alignFieldsNullability with complex types") {
+ val sourceStruct = new StructType()
+ .add("arrayField", ArrayType(StringType, containsNull = false), false)
+ .add("mapField", MapType(StringType, IntegerType, valueContainsNull =
false), false)
+
+ // Create HoodieSchema where element/value types are nullable
+ val fields = java.util.Arrays.asList(
+ HoodieSchemaField.of("arrayField",
HoodieSchema.createArray(HoodieSchema.createNullable(HoodieSchemaType.STRING))),
+ HoodieSchemaField.of("mapField",
HoodieSchema.createMap(HoodieSchema.createNullable(HoodieSchemaType.INT)))
+ )
+ val hoodieSchema = HoodieSchema.createRecord("ComplexNullability", "test",
null, fields)
+
+ val alignedStruct =
HoodieSchemaConversionUtils.alignFieldsNullability(sourceStruct, hoodieSchema)
+
+ // Verify that array element and map value nullability is aligned
+ val arrayType = alignedStruct.fields(0).dataType.asInstanceOf[ArrayType]
+ assert(arrayType.containsNull) // Aligned to nullable elements
+
+ val mapType = alignedStruct.fields(1).dataType.asInstanceOf[MapType]
+ assert(mapType.valueContainsNull) // Aligned to nullable values
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 6ca8b7455e63..dbce843fa936 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Column, DataFrame,
DataFrameUtil, Dataset, HoodieUnsafeUtils, HoodieUTF8StringFactory,
Spark3DataFrameUtil, Spark3HoodieUnsafeUtils, Spark3HoodieUTF8StringFactory,
SparkSession, SQLContext}
import
org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters}
+import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index f897d00d41c6..1d0391aa219c 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Column, DataFrame,
DataFrameUtil, ExpressionColumnNodeWrapper, HoodieUnsafeUtils,
HoodieUTF8StringFactory, Spark4DataFrameUtil, Spark4HoodieUnsafeUtils,
Spark4HoodieUTF8StringFactory, SparkSession, SQLContext}
import
org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters}
+import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable