This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e1d3008ec14 feat(vector): Add guard for user creating nested VECTOR 
(#18431)
3e1d3008ec14 is described below

commit 3e1d3008ec14693c6daeb20762111768900581b9
Author: Rahil C <[email protected]>
AuthorDate: Tue Mar 31 22:46:48 2026 -0700

    feat(vector): Add guard for user creating nested VECTOR (#18431)
---
 .../sql/avro/HoodieSparkSchemaConverters.scala     | 30 ++++++++--
 .../apache/hudi/common/schema/HoodieSchema.java    | 35 +++++++++++
 .../hudi/common/schema/TestHoodieSchema.java       | 70 +++++++++++++++-------
 .../hudi/functional/TestVectorDataSource.scala     | 26 ++++++++
 .../spark/sql/avro/TestSchemaConverters.scala      | 50 ++++++++++++++++
 5 files changed, 184 insertions(+), 27 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
index 871aefe2bec1..1ab3c5f1994d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -60,6 +60,24 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
                    recordName: String = "topLevelRecord",
                    nameSpace: String = "",
                    metadata: Metadata = Metadata.empty): HoodieSchema = {
+    toHoodieTypeNested(catalystType, nullable, recordName, nameSpace, 
metadata, depth = 0)
+  }
+
+  /**
+   * Converts a Spark DataType to a HoodieSchema, tracking how deeply nested 
the current type is
+   * relative to the top-level table schema. This depth is used to enforce 
that VECTOR columns can
+   * only appear as direct fields of the root record — not inside nested 
structs, arrays, or maps.
+   *
+   * The caller passes depth=0 for the root StructType. Each level of nesting 
increments depth by 1,
+   * so direct fields of the root record are at depth=1 (VECTOR allowed), and 
anything deeper is
+   * at depth≥2 (VECTOR not allowed).
+   */
+  private def toHoodieTypeNested(catalystType: DataType,
+                                  nullable: Boolean,
+                                  recordName: String,
+                                  nameSpace: String,
+                                  metadata: Metadata,
+                                  depth: Int): HoodieSchema = {
     val schema = catalystType match {
       // Primitive types
       case BooleanType => HoodieSchema.create(HoodieSchemaType.BOOLEAN)
@@ -86,6 +104,10 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
       case ArrayType(elementSparkType, containsNull)
           if metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
             
HoodieSchema.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).getType
 == HoodieSchemaType.VECTOR =>
+        if (depth > 1) {
+          throw new HoodieSchemaException(
+            s"VECTOR column '$recordName' must be a top-level field. Nested 
VECTOR columns (inside STRUCT, ARRAY, or MAP) are not supported.")
+        }
         if (containsNull) {
           throw new HoodieSchemaException(
             s"VECTOR type does not support nullable elements (field: 
$recordName)")
@@ -107,11 +129,11 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
         HoodieSchema.createVector(dimension, elementType)
 
       case ArrayType(elementType, containsNull) =>
-        val elementSchema = toHoodieType(elementType, containsNull, 
recordName, nameSpace, metadata)
+        val elementSchema = toHoodieTypeNested(elementType, containsNull, 
recordName, nameSpace, metadata, depth + 1)
         HoodieSchema.createArray(elementSchema)
 
       case MapType(StringType, valueType, valueContainsNull) =>
-        val valueSchema = toHoodieType(valueType, valueContainsNull, 
recordName, nameSpace, metadata)
+        val valueSchema = toHoodieTypeNested(valueType, valueContainsNull, 
recordName, nameSpace, metadata, depth + 1)
         HoodieSchema.createMap(valueSchema)
 
       case blobStruct: StructType if 
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
@@ -125,7 +147,7 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
         // Check if this might be a union (using heuristic like Avro converter)
         if (canBeUnion(st)) {
           val nonNullUnionFieldTypes = st.map { f =>
-            toHoodieType(f.dataType, nullable = false, f.name, childNameSpace, 
f.metadata)
+            toHoodieTypeNested(f.dataType, nullable = false, f.name, 
childNameSpace, f.metadata, depth + 1)
           }
           val unionFieldTypes = if (nullable) {
             (HoodieSchema.create(HoodieSchemaType.NULL) +: 
nonNullUnionFieldTypes).asJava
@@ -136,7 +158,7 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
         } else {
           // Create record
           val fields = st.map { f =>
-            val fieldSchema = toHoodieType(f.dataType, f.nullable, f.name, 
childNameSpace, f.metadata)
+            val fieldSchema = toHoodieTypeNested(f.dataType, f.nullable, 
f.name, childNameSpace, f.metadata, depth + 1)
             val doc = f.getComment.orNull
             // Match existing Avro SchemaConverters behavior: use NULL_VALUE 
for nullable unions
             // to avoid serializing "default":null in JSON representation
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index e20aeea24911..6b6747f70a81 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieAvroSchemaException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import lombok.Getter;
 import org.apache.avro.JsonProperties;
@@ -420,6 +421,11 @@ public class HoodieSchema implements Serializable {
   public static HoodieSchema createArray(HoodieSchema elementSchema) {
     ValidationUtils.checkArgument(elementSchema != null, "Element schema 
cannot be null");
 
+    if (elementSchema.getNonNullType().getType() == HoodieSchemaType.VECTOR) {
+      throw new HoodieSchemaException(
+          "VECTOR type is not supported as an array element. VECTOR columns 
must be top-level fields.");
+    }
+
     Schema elementAvroSchema = elementSchema.avroSchema;
     ValidationUtils.checkState(elementAvroSchema != null, "Element schema's 
Avro schema cannot be null");
 
@@ -436,6 +442,11 @@ public class HoodieSchema implements Serializable {
   public static HoodieSchema createMap(HoodieSchema valueSchema) {
     ValidationUtils.checkArgument(valueSchema != null, "Value schema cannot be 
null");
 
+    if (valueSchema.getNonNullType().getType() == HoodieSchemaType.VECTOR) {
+      throw new HoodieSchemaException(
+          "VECTOR type is not supported as a map value. VECTOR columns must be 
top-level fields.");
+    }
+
     Schema valueAvroSchema = valueSchema.avroSchema;
     ValidationUtils.checkState(valueAvroSchema != null, "Value schema's Avro 
schema cannot be null");
 
@@ -470,6 +481,8 @@ public class HoodieSchema implements Serializable {
     ValidationUtils.checkArgument(name != null && !name.isEmpty(), "Record 
name cannot be null or empty");
     ValidationUtils.checkArgument(fields != null, "Fields cannot be null");
 
+    validateNoVectorInNestedRecord(fields, false);
+
     // Convert HoodieSchemaFields to Avro Fields
     List<Schema.Field> avroFields = fields.stream()
         .map(HoodieSchemaField::getAvroField)
@@ -480,6 +493,28 @@ public class HoodieSchema implements Serializable {
     return new HoodieSchema(recordSchema, fields);
   }
 
+  /**
+   * Verifies that no VECTOR fields appear inside nested RECORD types. 
Top-level VECTOR fields
+   * (direct fields of the record being created) are allowed; VECTOR inside a 
child struct is not.
+   *
+   * @param fields the fields to validate
+   * @param nested true when validating inside a child RECORD (VECTOR throws); 
false at the top level (VECTOR is allowed)
+   * @throws HoodieSchemaException if any field (at any depth) is a VECTOR type
+   */
+  private static void validateNoVectorInNestedRecord(List<HoodieSchemaField> 
fields, boolean nested) {
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema nonNull = field.schema().getNonNullType();
+      if (nested && nonNull.getType() == HoodieSchemaType.VECTOR) {
+        throw new HoodieSchemaException(
+            "VECTOR column '" + field.name() + "' must be a top-level field. "
+                + "Nested VECTOR columns (inside STRUCT, ARRAY, or MAP) are 
not supported.");
+      }
+      if (nonNull.getType() == HoodieSchemaType.RECORD) {
+        validateNoVectorInNestedRecord(nonNull.getFields(), true);
+      }
+    }
+  }
+
   /**
    * Creates a union schema from multiple schemas.
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index 6dc2833805f3..53244ff513ca 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -22,6 +22,7 @@ import 
org.apache.hudi.common.schema.HoodieSchema.VariantLogicalType;
 import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalType;
@@ -1045,11 +1046,9 @@ public class TestHoodieSchema {
   }
 
   @Test
-  void testVectorInNestedStructures() throws Exception {
-    // Create vector schema
+  void testVectorAsTopLevelRecordField() {
     HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
 
-    // Test vector in record - verify it can be used as a field
     List<HoodieSchemaField> fields = Arrays.asList(
         HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
         HoodieSchemaField.of("embedding", vectorSchema)
@@ -1057,13 +1056,13 @@ public class TestHoodieSchema {
     HoodieSchema recordSchema = HoodieSchema.createRecord("TestRecord", null, 
null, fields);
     assertEquals(HoodieSchemaType.RECORD, recordSchema.getType());
 
-    // Verify vector field is preserved in the Avro schema
+    // Verify the vector field is preserved in the Avro schema
     Schema.Field embeddingField = 
recordSchema.getAvroSchema().getField("embedding");
     assertNotNull(embeddingField);
     HoodieSchema embeddingSchema = 
HoodieSchema.fromAvroSchema(embeddingField.schema());
     assertVector(embeddingSchema, 128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
 
-    // Round-trip record with vector field through JSON
+    // Round-trip the record with the vector field through JSON
     String recordJson = recordSchema.toString();
     HoodieSchema parsedRecord = HoodieSchema.parse(recordJson);
     assertEquals(recordSchema, parsedRecord);
@@ -1071,28 +1070,36 @@ public class TestHoodieSchema {
     assertNotNull(parsedEmbeddingField);
     HoodieSchema parsedEmbedding = 
HoodieSchema.fromAvroSchema(parsedEmbeddingField.schema());
     assertVector(parsedEmbedding, 128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+  }
 
-    // Test vector in array
-    HoodieSchema arraySchema = HoodieSchema.createArray(vectorSchema);
-    assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType());
-    assertVector(arraySchema.getElementType(), 128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+  @Test
+  void testVectorAsArrayElementThrows() {
+    HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+        () -> HoodieSchema.createArray(vectorSchema));
+    assertEquals("VECTOR type is not supported as an array element. VECTOR 
columns must be top-level fields.", ex.getMessage());
+  }
 
-    // Round-trip array of vectors through JSON
-    String arrayJson = arraySchema.toString();
-    HoodieSchema parsedArray = HoodieSchema.parse(arrayJson);
-    assertEquals(arraySchema, parsedArray);
-    assertVector(parsedArray.getElementType(), 128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+  @Test
+  void testVectorAsMapValueThrows() {
+    HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+        () -> HoodieSchema.createMap(vectorSchema));
+    assertEquals("VECTOR type is not supported as a map value. VECTOR columns 
must be top-level fields.", ex.getMessage());
+  }
 
-    // Test vector in map
-    HoodieSchema mapSchema = HoodieSchema.createMap(vectorSchema);
-    assertEquals(HoodieSchemaType.MAP, mapSchema.getType());
-    assertVector(mapSchema.getValueType(), 128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+  @Test
+  void testVectorInNestedRecordThrows() {
+    HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    HoodieSchema innerRecord = HoodieSchema.createRecord("inner", null, null,
+        Arrays.asList(HoodieSchemaField.of("embedding", vectorSchema)));
 
-    // Round-trip map with vector values through JSON
-    String mapJson = mapSchema.toString();
-    HoodieSchema parsedMap = HoodieSchema.parse(mapJson);
-    assertEquals(mapSchema, parsedMap);
-    assertVector(parsedMap.getValueType(), 128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    HoodieSchemaException ex = assertThrows(HoodieSchemaException.class, () ->
+        HoodieSchema.createRecord("outer", null, null, Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("data", innerRecord))));
+    assertEquals("VECTOR column 'embedding' must be a top-level field. "
+        + "Nested VECTOR columns (inside STRUCT, ARRAY, or MAP) are not 
supported.", ex.getMessage());
   }
 
   @Test
@@ -2845,4 +2852,21 @@ public class TestHoodieSchema {
     assertEquals(HoodieSchemaType.BLOB, parsed.getType());
     assertInstanceOf(HoodieSchema.Blob.class, parsed);
   }
+
+  @Test
+  public void testCreateArrayWithNullableVectorThrows() {
+    HoodieSchema vectorSchema = 
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+    HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+        () -> HoodieSchema.createArray(vectorSchema));
+    assertEquals("VECTOR type is not supported as an array element. VECTOR 
columns must be top-level fields.", ex.getMessage());
+  }
+
+  @Test
+  public void testCreateMapWithNullableVectorThrows() {
+    HoodieSchema vectorSchema = 
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+    HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+        () -> HoodieSchema.createMap(vectorSchema));
+    assertEquals("VECTOR type is not supported as a map value. VECTOR columns 
must be top-level fields.", ex.getMessage());
+  }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
index 0522950f0d9f..8ef97c167b9f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
@@ -991,10 +991,36 @@ class TestVectorDataSource extends 
HoodieSparkClientTestBase {
     }
   }
 
+  @Test
+  def testNestedVectorWriteThrows(): Unit = {
+    // A VECTOR nested inside a struct field must be rejected at write time.
+    val meta = vectorMetadata("VECTOR(4)")
+    val nestedStruct = StructType(Seq(
+      StructField("embedding", ArrayType(FloatType, containsNull = false), 
nullable = false, metadata = meta)
+    ))
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("data", nestedStruct, nullable = false)
+    ))
+    val data = Seq(Row("key_1", Row(Seq(1.0f, 2.0f, 3.0f, 4.0f))))
+    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema)
+
+    val ex = assertThrows(classOf[Exception], () => {
+      writeHudiTable(df, "nested_vector_test", basePath + "/nested_vector")
+    })
+    assertTrue(nestedVectorMessageInCauseChain(ex),
+      s"Expected nested VECTOR guard to fire, but got: ${ex.getMessage}")
+  }
+
   private def assertArrayEquals(expected: Array[Byte], actual: Array[Byte], 
message: String): Unit = {
     assertEquals(expected.length, actual.length, s"$message: length mismatch")
     expected.zip(actual).zipWithIndex.foreach { case ((e, a), idx) =>
       assertEquals(e, a, s"$message: mismatch at index $idx")
     }
   }
+
+  private def nestedVectorMessageInCauseChain(ex: Throwable): Boolean =
+    ex != null && (Option(ex.getMessage).exists(_.contains(
+      "VECTOR column 'embedding' must be a top-level field. Nested VECTOR 
columns (inside STRUCT, ARRAY, or MAP) are not supported."))
+      || nestedVectorMessageInCauseChain(ex.getCause))
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
index ae3b46b08eb5..c27fa2d2878f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
@@ -249,6 +249,56 @@ class TestSchemaConverters extends SparkAdapterSupport {
     assertEquals(HoodieSchemaType.VARIANT, 
nullableField.schema().getNonNullType.getType)
   }
 
+  @Test
+  def testTopLevelVectorStillAllowed(): Unit = {
+    val vectorMetadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+      .build()
+    val sparkType = new StructType(Array[StructField](
+      StructField("id", DataTypes.LongType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false), 
nullable = false, metadata = vectorMetadata)
+    ))
+    val hoodieSchema = HoodieSparkSchemaConverters.toHoodieType(sparkType, 
recordName = "record")
+    assertEquals(HoodieSchemaType.VECTOR, 
hoodieSchema.getField("embedding").get().schema().getType)
+  }
+
+  @Test
+  def testVectorInNestedStructThrows(): Unit = {
+    val vectorMetadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+      .build()
+    // Outer struct has a nested struct whose field is a VECTOR
+    val innerStruct = new StructType(Array[StructField](
+      StructField("embedding", ArrayType(FloatType, containsNull = false), 
nullable = false, metadata = vectorMetadata)
+    ))
+    val outerStruct = new StructType(Array[StructField](
+      StructField("id", DataTypes.LongType, nullable = false),
+      StructField("data", innerStruct, nullable = false)
+    ))
+    val exception = assertThrows(classOf[HoodieSchemaException], () => {
+      HoodieSparkSchemaConverters.toHoodieType(outerStruct, recordName = 
"record")
+    })
+    assertEquals("VECTOR column 'embedding' must be a top-level field. Nested 
VECTOR columns (inside STRUCT, ARRAY, or MAP) are not supported.", 
exception.getMessage)
+  }
+
+  @Test
+  def testVectorInsideArrayOfStructsThrows(): Unit = {
+    // VECTOR nested inside an array of structs: ARRAY<STRUCT<embedding 
VECTOR(4)>>
+    val vectorMetadata = new MetadataBuilder()
+      .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+      .build()
+    val innerStruct = new StructType(Array[StructField](
+      StructField("embedding", ArrayType(FloatType, containsNull = false), 
nullable = false, metadata = vectorMetadata)
+    ))
+    val outerStruct = new StructType(Array[StructField](
+      StructField("items", ArrayType(innerStruct, containsNull = false), 
nullable = false)
+    ))
+    val exception = assertThrows(classOf[HoodieSchemaException], () => {
+      HoodieSparkSchemaConverters.toHoodieType(outerStruct, recordName = 
"record")
+    })
+    assertEquals("VECTOR column 'embedding' must be a top-level field. Nested 
VECTOR columns (inside STRUCT, ARRAY, or MAP) are not supported.", 
exception.getMessage)
+  }
+
   /**
    * Validates the content of the blob fields to ensure the fields match our 
expectations.
    * @param dataType the StructType containing the blob fields to validate

Reply via email to