This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3e1d3008ec14 feat(vector): Add guard for user creating nested VECTOR
(#18431)
3e1d3008ec14 is described below
commit 3e1d3008ec14693c6daeb20762111768900581b9
Author: Rahil C <[email protected]>
AuthorDate: Tue Mar 31 22:46:48 2026 -0700
feat(vector): Add guard for user creating nested VECTOR (#18431)
---
.../sql/avro/HoodieSparkSchemaConverters.scala | 30 ++++++++--
.../apache/hudi/common/schema/HoodieSchema.java | 35 +++++++++++
.../hudi/common/schema/TestHoodieSchema.java | 70 +++++++++++++++-------
.../hudi/functional/TestVectorDataSource.scala | 26 ++++++++
.../spark/sql/avro/TestSchemaConverters.scala | 50 ++++++++++++++++
5 files changed, 184 insertions(+), 27 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
index 871aefe2bec1..1ab3c5f1994d 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -60,6 +60,24 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
recordName: String = "topLevelRecord",
nameSpace: String = "",
metadata: Metadata = Metadata.empty): HoodieSchema = {
+ toHoodieTypeNested(catalystType, nullable, recordName, nameSpace,
metadata, depth = 0)
+ }
+
+ /**
+ * Converts a Spark DataType to a HoodieSchema, tracking how deeply nested
the current type is
+ * relative to the top-level table schema. This depth is used to enforce
that VECTOR columns can
+ * only appear as direct fields of the root record — not inside nested
structs, arrays, or maps.
+ *
+ * The caller passes depth=0 for the root StructType. Each level of nesting
increments depth by 1,
+ * so direct fields of the root record are at depth=1 (VECTOR allowed), and
anything deeper is
+ * at depth≥2 (VECTOR not allowed).
+ */
+ private def toHoodieTypeNested(catalystType: DataType,
+ nullable: Boolean,
+ recordName: String,
+ nameSpace: String,
+ metadata: Metadata,
+ depth: Int): HoodieSchema = {
val schema = catalystType match {
// Primitive types
case BooleanType => HoodieSchema.create(HoodieSchemaType.BOOLEAN)
@@ -86,6 +104,10 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
case ArrayType(elementSparkType, containsNull)
if metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
HoodieSchema.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)).getType
== HoodieSchemaType.VECTOR =>
+ if (depth > 1) {
+ throw new HoodieSchemaException(
+ s"VECTOR column '$recordName' must be a top-level field. Nested
VECTOR columns (inside STRUCT, ARRAY, or MAP) are not supported.")
+ }
if (containsNull) {
throw new HoodieSchemaException(
s"VECTOR type does not support nullable elements (field:
$recordName)")
@@ -107,11 +129,11 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
HoodieSchema.createVector(dimension, elementType)
case ArrayType(elementType, containsNull) =>
- val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace, metadata)
+ val elementSchema = toHoodieTypeNested(elementType, containsNull,
recordName, nameSpace, metadata, depth + 1)
HoodieSchema.createArray(elementSchema)
case MapType(StringType, valueType, valueContainsNull) =>
- val valueSchema = toHoodieType(valueType, valueContainsNull,
recordName, nameSpace, metadata)
+ val valueSchema = toHoodieTypeNested(valueType, valueContainsNull,
recordName, nameSpace, metadata, depth + 1)
HoodieSchema.createMap(valueSchema)
case blobStruct: StructType if
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
@@ -125,7 +147,7 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
// Check if this might be a union (using heuristic like Avro converter)
if (canBeUnion(st)) {
val nonNullUnionFieldTypes = st.map { f =>
- toHoodieType(f.dataType, nullable = false, f.name, childNameSpace,
f.metadata)
+ toHoodieTypeNested(f.dataType, nullable = false, f.name,
childNameSpace, f.metadata, depth + 1)
}
val unionFieldTypes = if (nullable) {
(HoodieSchema.create(HoodieSchemaType.NULL) +:
nonNullUnionFieldTypes).asJava
@@ -136,7 +158,7 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
} else {
// Create record
val fields = st.map { f =>
- val fieldSchema = toHoodieType(f.dataType, f.nullable, f.name,
childNameSpace, f.metadata)
+ val fieldSchema = toHoodieTypeNested(f.dataType, f.nullable,
f.name, childNameSpace, f.metadata, depth + 1)
val doc = f.getComment.orNull
// Match existing Avro SchemaConverters behavior: use NULL_VALUE
for nullable unions
// to avoid serializing "default":null in JSON representation
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index e20aeea24911..6b6747f70a81 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import lombok.Getter;
import org.apache.avro.JsonProperties;
@@ -420,6 +421,11 @@ public class HoodieSchema implements Serializable {
public static HoodieSchema createArray(HoodieSchema elementSchema) {
ValidationUtils.checkArgument(elementSchema != null, "Element schema
cannot be null");
+ if (elementSchema.getNonNullType().getType() == HoodieSchemaType.VECTOR) {
+ throw new HoodieSchemaException(
+ "VECTOR type is not supported as an array element. VECTOR columns
must be top-level fields.");
+ }
+
Schema elementAvroSchema = elementSchema.avroSchema;
ValidationUtils.checkState(elementAvroSchema != null, "Element schema's
Avro schema cannot be null");
@@ -436,6 +442,11 @@ public class HoodieSchema implements Serializable {
public static HoodieSchema createMap(HoodieSchema valueSchema) {
ValidationUtils.checkArgument(valueSchema != null, "Value schema cannot be
null");
+ if (valueSchema.getNonNullType().getType() == HoodieSchemaType.VECTOR) {
+ throw new HoodieSchemaException(
+ "VECTOR type is not supported as a map value. VECTOR columns must be
top-level fields.");
+ }
+
Schema valueAvroSchema = valueSchema.avroSchema;
ValidationUtils.checkState(valueAvroSchema != null, "Value schema's Avro
schema cannot be null");
@@ -470,6 +481,8 @@ public class HoodieSchema implements Serializable {
ValidationUtils.checkArgument(name != null && !name.isEmpty(), "Record
name cannot be null or empty");
ValidationUtils.checkArgument(fields != null, "Fields cannot be null");
+ validateNoVectorInNestedRecord(fields, false);
+
// Convert HoodieSchemaFields to Avro Fields
List<Schema.Field> avroFields = fields.stream()
.map(HoodieSchemaField::getAvroField)
@@ -480,6 +493,28 @@ public class HoodieSchema implements Serializable {
return new HoodieSchema(recordSchema, fields);
}
+ /**
+ * Verifies that no VECTOR fields appear inside nested RECORD types.
Top-level VECTOR fields
+ * (direct fields of the record being created) are allowed; VECTOR inside a
child struct is not.
+ *
+ * @param fields the fields to validate
+ * @param nested true when validating inside a child RECORD (VECTOR throws);
false at the top level (VECTOR is allowed)
+ * @throws HoodieSchemaException if any field (at any depth) is a VECTOR type
+ */
+ private static void validateNoVectorInNestedRecord(List<HoodieSchemaField>
fields, boolean nested) {
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema nonNull = field.schema().getNonNullType();
+ if (nested && nonNull.getType() == HoodieSchemaType.VECTOR) {
+ throw new HoodieSchemaException(
+ "VECTOR column '" + field.name() + "' must be a top-level field. "
+ + "Nested VECTOR columns (inside STRUCT, ARRAY, or MAP) are
not supported.");
+ }
+ if (nonNull.getType() == HoodieSchemaType.RECORD) {
+ validateNoVectorInNestedRecord(nonNull.getFields(), true);
+ }
+ }
+ }
+
/**
* Creates a union schema from multiple schemas.
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index 6dc2833805f3..53244ff513ca 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -22,6 +22,7 @@ import
org.apache.hudi.common.schema.HoodieSchema.VariantLogicalType;
import org.apache.hudi.common.schema.HoodieSchema.VectorLogicalType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
@@ -1045,11 +1046,9 @@ public class TestHoodieSchema {
}
@Test
- void testVectorInNestedStructures() throws Exception {
- // Create vector schema
+ void testVectorAsTopLevelRecordField() {
HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
- // Test vector in record - verify it can be used as a field
List<HoodieSchemaField> fields = Arrays.asList(
HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
HoodieSchemaField.of("embedding", vectorSchema)
@@ -1057,13 +1056,13 @@ public class TestHoodieSchema {
HoodieSchema recordSchema = HoodieSchema.createRecord("TestRecord", null,
null, fields);
assertEquals(HoodieSchemaType.RECORD, recordSchema.getType());
- // Verify vector field is preserved in the Avro schema
+ // Verify the vector field is preserved in the Avro schema
Schema.Field embeddingField =
recordSchema.getAvroSchema().getField("embedding");
assertNotNull(embeddingField);
HoodieSchema embeddingSchema =
HoodieSchema.fromAvroSchema(embeddingField.schema());
assertVector(embeddingSchema, 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
- // Round-trip record with vector field through JSON
+ // Round-trip the record with the vector field through JSON
String recordJson = recordSchema.toString();
HoodieSchema parsedRecord = HoodieSchema.parse(recordJson);
assertEquals(recordSchema, parsedRecord);
@@ -1071,28 +1070,36 @@ public class TestHoodieSchema {
assertNotNull(parsedEmbeddingField);
HoodieSchema parsedEmbedding =
HoodieSchema.fromAvroSchema(parsedEmbeddingField.schema());
assertVector(parsedEmbedding, 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ }
- // Test vector in array
- HoodieSchema arraySchema = HoodieSchema.createArray(vectorSchema);
- assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType());
- assertVector(arraySchema.getElementType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ @Test
+ void testVectorAsArrayElementThrows() {
+ HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createArray(vectorSchema));
+ assertEquals("VECTOR type is not supported as an array element. VECTOR
columns must be top-level fields.", ex.getMessage());
+ }
- // Round-trip array of vectors through JSON
- String arrayJson = arraySchema.toString();
- HoodieSchema parsedArray = HoodieSchema.parse(arrayJson);
- assertEquals(arraySchema, parsedArray);
- assertVector(parsedArray.getElementType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ @Test
+ void testVectorAsMapValueThrows() {
+ HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createMap(vectorSchema));
+ assertEquals("VECTOR type is not supported as a map value. VECTOR columns
must be top-level fields.", ex.getMessage());
+ }
- // Test vector in map
- HoodieSchema mapSchema = HoodieSchema.createMap(vectorSchema);
- assertEquals(HoodieSchemaType.MAP, mapSchema.getType());
- assertVector(mapSchema.getValueType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ @Test
+ void testVectorInNestedRecordThrows() {
+ HoodieSchema.Vector vectorSchema = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ HoodieSchema innerRecord = HoodieSchema.createRecord("inner", null, null,
+ Arrays.asList(HoodieSchemaField.of("embedding", vectorSchema)));
- // Round-trip map with vector values through JSON
- String mapJson = mapSchema.toString();
- HoodieSchema parsedMap = HoodieSchema.parse(mapJson);
- assertEquals(mapSchema, parsedMap);
- assertVector(parsedMap.getValueType(), 128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class, () ->
+ HoodieSchema.createRecord("outer", null, null, Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("data", innerRecord))));
+ assertEquals("VECTOR column 'embedding' must be a top-level field. "
+ + "Nested VECTOR columns (inside STRUCT, ARRAY, or MAP) are not
supported.", ex.getMessage());
}
@Test
@@ -2845,4 +2852,21 @@ public class TestHoodieSchema {
assertEquals(HoodieSchemaType.BLOB, parsed.getType());
assertInstanceOf(HoodieSchema.Blob.class, parsed);
}
+
+ @Test
+ public void testCreateArrayWithNullableVectorThrows() {
+ HoodieSchema vectorSchema =
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createArray(vectorSchema));
+ assertEquals("VECTOR type is not supported as an array element. VECTOR
columns must be top-level fields.", ex.getMessage());
+ }
+
+ @Test
+ public void testCreateMapWithNullableVectorThrows() {
+ HoodieSchema vectorSchema =
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+ HoodieSchemaException ex = assertThrows(HoodieSchemaException.class,
+ () -> HoodieSchema.createMap(vectorSchema));
+ assertEquals("VECTOR type is not supported as a map value. VECTOR columns
must be top-level fields.", ex.getMessage());
+ }
+
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
index 0522950f0d9f..8ef97c167b9f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
@@ -991,10 +991,36 @@ class TestVectorDataSource extends
HoodieSparkClientTestBase {
}
}
+ @Test
+ def testNestedVectorWriteThrows(): Unit = {
+ // A VECTOR nested inside a struct field must be rejected at write time.
+ val meta = vectorMetadata("VECTOR(4)")
+ val nestedStruct = StructType(Seq(
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = meta)
+ ))
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("data", nestedStruct, nullable = false)
+ ))
+ val data = Seq(Row("key_1", Row(Seq(1.0f, 2.0f, 3.0f, 4.0f))))
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
+
+ val ex = assertThrows(classOf[Exception], () => {
+ writeHudiTable(df, "nested_vector_test", basePath + "/nested_vector")
+ })
+ assertTrue(nestedVectorMessageInCauseChain(ex),
+ s"Expected nested VECTOR guard to fire, but got: ${ex.getMessage}")
+ }
+
private def assertArrayEquals(expected: Array[Byte], actual: Array[Byte],
message: String): Unit = {
assertEquals(expected.length, actual.length, s"$message: length mismatch")
expected.zip(actual).zipWithIndex.foreach { case ((e, a), idx) =>
assertEquals(e, a, s"$message: mismatch at index $idx")
}
}
+
+ private def nestedVectorMessageInCauseChain(ex: Throwable): Boolean =
+ ex != null && (Option(ex.getMessage).exists(_.contains(
+ "VECTOR column 'embedding' must be a top-level field. Nested VECTOR
columns (inside STRUCT, ARRAY, or MAP) are not supported."))
+ || nestedVectorMessageInCauseChain(ex.getCause))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
index ae3b46b08eb5..c27fa2d2878f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
@@ -249,6 +249,56 @@ class TestSchemaConverters extends SparkAdapterSupport {
assertEquals(HoodieSchemaType.VARIANT,
nullableField.schema().getNonNullType.getType)
}
+ @Test
+ def testTopLevelVectorStillAllowed(): Unit = {
+ val vectorMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ val sparkType = new StructType(Array[StructField](
+ StructField("id", DataTypes.LongType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = vectorMetadata)
+ ))
+ val hoodieSchema = HoodieSparkSchemaConverters.toHoodieType(sparkType,
recordName = "record")
+ assertEquals(HoodieSchemaType.VECTOR,
hoodieSchema.getField("embedding").get().schema().getType)
+ }
+
+ @Test
+ def testVectorInNestedStructThrows(): Unit = {
+ val vectorMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ // Outer struct has a nested struct whose field is a VECTOR
+ val innerStruct = new StructType(Array[StructField](
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = vectorMetadata)
+ ))
+ val outerStruct = new StructType(Array[StructField](
+ StructField("id", DataTypes.LongType, nullable = false),
+ StructField("data", innerStruct, nullable = false)
+ ))
+ val exception = assertThrows(classOf[HoodieSchemaException], () => {
+ HoodieSparkSchemaConverters.toHoodieType(outerStruct, recordName =
"record")
+ })
+ assertEquals("VECTOR column 'embedding' must be a top-level field. Nested
VECTOR columns (inside STRUCT, ARRAY, or MAP) are not supported.",
exception.getMessage)
+ }
+
+ @Test
+ def testVectorInsideArrayOfStructsThrows(): Unit = {
+ // VECTOR nested inside an array of structs: ARRAY<STRUCT<embedding
VECTOR(4)>>
+ val vectorMetadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD, "VECTOR(4)")
+ .build()
+ val innerStruct = new StructType(Array[StructField](
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
nullable = false, metadata = vectorMetadata)
+ ))
+ val outerStruct = new StructType(Array[StructField](
+ StructField("items", ArrayType(innerStruct, containsNull = false),
nullable = false)
+ ))
+ val exception = assertThrows(classOf[HoodieSchemaException], () => {
+ HoodieSparkSchemaConverters.toHoodieType(outerStruct, recordName =
"record")
+ })
+ assertEquals("VECTOR column 'embedding' must be a top-level field. Nested
VECTOR columns (inside STRUCT, ARRAY, or MAP) are not supported.",
exception.getMessage)
+ }
+
/**
* Validates the content of the blob fields to ensure the fields match our
expectations.
* @param dataType the StructType containing the blob fields to validate