This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3fc1deb68b0f feat(vector): Support writing VECTOR to parquet and avro
formats using Spark (#18328)
3fc1deb68b0f is described below
commit 3fc1deb68b0fe7cb5291aa3231deee69f003648c
Author: Rahil C <[email protected]>
AuthorDate: Fri Mar 27 05:49:47 2026 -0400
feat(vector): Support writing VECTOR to parquet and avro formats using
Spark (#18328)
* futher changes for getting parquet to write hudi vectors
* trying to resolve type issues during read/write
* finally able to write from spark to hudi to parquet vectors and then read
them back
* get things working
* fix vector write path to support all element types, add column projection
test
- Write path (HoodieRowParquetWriteSupport.makeWriter) now switches on
VectorElementType (FLOAT/DOUBLE/INT8) instead of hardcoding float,
matching the read paths
- Remove redundant detectVectorColumns call in readBaseFile by reusing
vectorCols from requiredSchema for requestedSchema
- Add testColumnProjectionWithVector covering 3 scenarios: exclude vector,
vector-only, and all columns
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
* Address PR review comments for vector Parquet read/write support
- Use VectorLogicalType.VECTOR_BYTE_ORDER instead of hardcoded
ByteOrder.LITTLE_ENDIAN in all 4 locations (write support, reader,
Scala reader context, file group format)
- Add Math.multiplyExact overflow guard for dimension * elementSize
in HoodieRowParquetWriteSupport
- Remove unnecessary array clone in HoodieSparkParquetReader
- Add clarifying comment on non-vector column else branch
- Fix misleading "float arrays" comment to "typed arrays"
- Move inline JavaConverters import to top-level in
SparkFileFormatInternalRowReaderContext
- Import Metadata at top level instead of fully-qualified reference
- Consolidate duplicate detectVectorColumns, replaceVectorColumnsWithBinary,
and convertBinaryToVectorArray into
SparkFileFormatInternalRowReaderContext
companion object; HoodieFileGroupReaderBasedFileFormat now delegates
- Add Javadoc on VectorType explaining it's needed for InternalSchema
type hierarchy (cannot reuse HoodieSchema.Vector)
- Clean up unused imports (ByteOrder, ByteBuffer, GenericArrayData,
StructField, BinaryType, HoodieSchemaType)
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
* Add comprehensive vector test coverage for all element types and table
types
New tests added to TestVectorDataSource:
- testDoubleVectorRoundTrip: DOUBLE element type end-to-end (64-dim)
- testInt8VectorRoundTrip: INT8/byte element type end-to-end (256-dim)
- testMultipleVectorColumns: two vector columns (float + double) in
same schema with selective nulls and per-column projection
- testMorTableWithVectors: MOR table type with insert + upsert,
verifying merged view returns correct vectors
- testCowUpsertWithVectors: COW upsert (update existing + insert new)
verifying vector values after merge
- testLargeDimensionVector: 1536-dim float vectors (OpenAI embedding
size) to exercise large buffer allocation
- testSmallDimensionVector: 2-dim vectors with edge values
(Float.MaxValue) to verify boundary handling
- testVectorWithNonVectorArrayColumn: vector column alongside a
regular ArrayType(StringType) to ensure non-vector arrays are
not incorrectly treated as vectors
- testMorWithMultipleUpserts: MOR with 3 successive upsert batches
of DOUBLE vectors, verifying the latest value wins per key
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
* Handle vector schema checks and skip vector column stats
* Extract VectorConversionUtils, add write-path dimension validation, fix
hot-path allocation
- Create shared VectorConversionUtils utility class to eliminate duplicated
vector conversion logic across HoodieSparkParquetReader,
SparkFileFormatInternalRowReaderContext, and
HoodieFileGroupReaderBasedFileFormat
- Add explicit dimension validation in HoodieRowParquetWriteSupport to
prevent
silent data corruption when array length doesn't match declared vector
dimension
- Reuse GenericInternalRow in HoodieSparkParquetReader's vector
post-processing
loop to reduce GC pressure on large scans
* fix(vector): replace existential type Map[Int, _] with Map[Int,
HoodieSchema.Vector] to fix Scala 2.12 type inference error
* minor comilation issue fix
* further self review fixes
* fix(vector): fix Scala import ordering style violations
- Move VectorConversionUtils import into hudi group (was misplaced in
3rdParty)
- Add blank line between hudi and 3rdParty import groups
- Add blank line between java and scala import groups
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
* address balaji comments
* address balaji comment around putting hoodie vector metdata in parquet
footer
* address all intial core comments
* address perf comments
* remove comment
* get spark 3.3 working
* address voon comment
* reduce test code duplication
---------
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../hudi/io/storage/HoodieSparkParquetReader.java | 26 +-
.../hudi/io/storage/VectorConversionUtils.java | 238 +++++
.../storage/row/HoodieRowParquetWriteSupport.java | 50 +
.../SparkFileFormatInternalRowReaderContext.scala | 89 +-
.../parquet/HoodieParquetFileFormatHelper.scala | 35 +-
.../apache/hudi/common/schema/HoodieSchema.java | 95 +-
.../HoodieSchemaComparatorForSchemaEvolution.java | 10 +
.../schema/HoodieSchemaCompatibilityChecker.java | 22 +
.../java/org/apache/hudi/internal/schema/Type.java | 7 +-
.../org/apache/hudi/internal/schema/Types.java | 70 ++
.../schema/convert/InternalSchemaConverter.java | 14 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 7 +-
.../hudi/common/schema/TestHoodieSchema.java | 24 +-
...stHoodieSchemaComparatorForSchemaEvolution.java | 18 +
.../schema/TestHoodieSchemaCompatibility.java | 38 +
.../convert/TestInternalSchemaConverter.java | 58 ++
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 9 +
.../apache/hudi/avro/HoodieAvroWriteSupport.java | 4 +
.../avro/AvroSchemaConverterWithTimestampNTZ.java | 12 +
.../HoodieFileGroupReaderBasedFileFormat.scala | 134 ++-
.../hudi/functional/TestVectorDataSource.scala | 1000 ++++++++++++++++++++
21 files changed, 1909 insertions(+), 51 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 160a731cece4..80ed9be8420e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -64,7 +64,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import scala.Option$;
@@ -142,10 +144,20 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema
requestedSchema, List<Filter> readFilters) throws IOException {
HoodieSchema nonNullSchema = requestedSchema.getNonNullType();
StructType structSchema =
HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
+
+ // Detect vector columns: ordinal → Vector schema
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo =
VectorConversionUtils.detectVectorColumns(nonNullSchema);
+
+ // For vector columns, replace ArrayType(FloatType) with BinaryType in the
read schema
+ // so SparkBasicSchemaEvolution sees matching types (file has
FIXED_LEN_BYTE_ARRAY → BinaryType)
+ StructType readStructSchema = vectorColumnInfo.isEmpty()
+ ? structSchema
+ : VectorConversionUtils.replaceVectorColumnsWithBinary(structSchema,
vectorColumnInfo);
+
Option<MessageType> messageSchema =
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
boolean enableTimestampFieldRepair =
storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true);
StructType dataStructType = convertToStruct(enableTimestampFieldRepair ?
SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) :
getFileSchema());
- SparkBasicSchemaEvolution evolution = new
SparkBasicSchemaEvolution(dataStructType, structSchema,
SQLConf.get().sessionLocalTimeZone());
+ SparkBasicSchemaEvolution evolution = new
SparkBasicSchemaEvolution(dataStructType, readStructSchema,
SQLConf.get().sessionLocalTimeZone());
String readSchemaJson = evolution.getRequestSchema().json();
SQLConf sqlConf = SQLConf.get();
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA,
readSchemaJson);
@@ -184,6 +196,18 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
UnsafeProjection projection = evolution.generateUnsafeProjection();
ParquetReaderIterator<InternalRow> parquetReaderIterator = new
ParquetReaderIterator<>(reader);
CloseableMappingIterator<InternalRow, UnsafeRow> projectedIterator = new
CloseableMappingIterator<>(parquetReaderIterator, projection::apply);
+
+ if (!vectorColumnInfo.isEmpty()) {
+ // Post-process: convert binary VECTOR columns back to typed arrays
+ UnsafeProjection vectorProjection =
UnsafeProjection.create(structSchema);
+ Function<InternalRow, InternalRow> mapper =
+ VectorConversionUtils.buildRowMapper(readStructSchema,
vectorColumnInfo, vectorProjection::apply);
+ CloseableMappingIterator<UnsafeRow, UnsafeRow> vectorIterator =
+ new CloseableMappingIterator<>(projectedIterator, row -> (UnsafeRow)
mapper.apply(row));
+ readerIterators.add(vectorIterator);
+ return vectorIterator;
+ }
+
readerIterators.add(projectedIterator);
return projectedIterator;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java
new file mode 100644
index 000000000000..2bf8e86b5d0f
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+/**
+ * Shared utility methods for vector column handling during Parquet read/write.
+ *
+ * Vectors are stored as Parquet FIXED_LEN_BYTE_ARRAY columns. On read, Spark
maps these
+ * to BinaryType. This class provides the canonical conversion between the
binary
+ * representation and Spark's typed ArrayData (float[], double[], byte[]).
+ *
+ */
+public final class VectorConversionUtils {
+
+ private VectorConversionUtils() {
+ }
+
+ /**
+ * Detects VECTOR columns in a HoodieSchema record and returns a map of
field ordinal
+ * to the corresponding {@link HoodieSchema.Vector} schema.
+ *
+ * @param schema a HoodieSchema of type RECORD (or null)
+ * @return map from field index to Vector schema; empty map if schema is
null or has no vectors
+ */
+ public static Map<Integer, HoodieSchema.Vector>
detectVectorColumns(HoodieSchema schema) {
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
+ if (schema == null) {
+ return vectorColumnInfo;
+ }
+ List<HoodieSchemaField> fields = schema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchema fieldSchema = fields.get(i).schema().getNonNullType();
+ if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+ vectorColumnInfo.put(i, (HoodieSchema.Vector) fieldSchema);
+ }
+ }
+ return vectorColumnInfo;
+ }
+
+ /**
+ * Detects VECTOR columns from Spark StructType metadata annotations.
+ * Fields with metadata key {@link HoodieSchema#TYPE_METADATA_FIELD}
starting with "VECTOR"
+ * are parsed and included.
+ *
+ * @param schema Spark StructType
+ * @return map from field index to Vector schema; empty map if no vectors
found
+ */
+ public static Map<Integer, HoodieSchema.Vector>
detectVectorColumnsFromMetadata(StructType schema) {
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
+ if (schema == null) {
+ return vectorColumnInfo;
+ }
+ StructField[] fields = schema.fields();
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ if (field.metadata().contains(HoodieSchema.TYPE_METADATA_FIELD)) {
+ String typeStr =
field.metadata().getString(HoodieSchema.TYPE_METADATA_FIELD);
+ if (typeStr.startsWith("VECTOR")) {
+ HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeStr);
+ if (parsed.getType() == HoodieSchemaType.VECTOR) {
+ vectorColumnInfo.put(i, (HoodieSchema.Vector) parsed);
+ }
+ }
+ }
+ }
+ return vectorColumnInfo;
+ }
+
+ /**
+ * Replaces ArrayType with BinaryType for VECTOR columns so the Parquet
reader
+ * can read FIXED_LEN_BYTE_ARRAY data without type mismatch.
+ *
+ * @param structType the original Spark schema
+ * @param vectorColumns map of ordinal to vector info (only the key set is
used)
+ * @return a new StructType with vector columns replaced by BinaryType
+ */
+ public static StructType replaceVectorColumnsWithBinary(StructType
structType, Map<Integer, ?> vectorColumns) {
+ StructField[] fields = structType.fields();
+ StructField[] newFields = new StructField[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ if (vectorColumns.containsKey(i)) {
+ // Preserve the original field metadata (including hudi_type) so that
downstream code
+ // calling detectVectorColumnsFromMetadata on the modified schema
still finds vectors.
+ newFields[i] = new StructField(fields[i].name(), BinaryType$.MODULE$,
fields[i].nullable(), fields[i].metadata());
+ } else {
+ newFields[i] = fields[i];
+ }
+ }
+ return new StructType(newFields);
+ }
+
+ /**
+ * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to
a typed array
+ * based on the vector's element type and dimension.
+ *
+ * @param bytes raw bytes read from Parquet
+ * @param vectorSchema the vector schema describing dimension and element
type
+ * @return an ArrayData containing the decoded float[], double[], or byte[]
array
+ * @throws IllegalArgumentException if byte array length doesn't match
expected size
+ */
+ public static ArrayData convertBinaryToVectorArray(byte[] bytes,
HoodieSchema.Vector vectorSchema) {
+ return convertBinaryToVectorArray(bytes, vectorSchema.getDimension(),
vectorSchema.getVectorElementType());
+ }
+
+ /**
+ * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to
a typed array.
+ *
+ * @param bytes raw bytes read from Parquet
+ * @param dim vector dimension (number of elements)
+ * @param elemType element type (FLOAT, DOUBLE, or INT8)
+ * @return an ArrayData containing the decoded float[], double[], or byte[]
array
+ * @throws IllegalArgumentException if byte array length doesn't match
expected size
+ */
+ public static ArrayData convertBinaryToVectorArray(byte[] bytes, int dim,
+
HoodieSchema.Vector.VectorElementType elemType) {
+ int expectedSize = dim * elemType.getElementSize();
+ checkArgument(bytes.length == expectedSize,
+ "Vector byte array length mismatch: expected " + expectedSize + " but
got " + bytes.length);
+ ByteBuffer buffer =
ByteBuffer.wrap(bytes).order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+ switch (elemType) {
+ case FLOAT:
+ float[] floats = new float[dim];
+ for (int j = 0; j < dim; j++) {
+ floats[j] = buffer.getFloat();
+ }
+ return new GenericArrayData(floats);
+ case DOUBLE:
+ double[] doubles = new double[dim];
+ for (int j = 0; j < dim; j++) {
+ doubles[j] = buffer.getDouble();
+ }
+ return new GenericArrayData(doubles);
+ case INT8:
+ byte[] int8s = new byte[dim];
+ buffer.get(int8s);
+ // Use UnsafeArrayData to avoid boxing each byte into a Byte object.
+ // GenericArrayData(byte[]) would box every element into Object[].
+ return UnsafeArrayData.fromPrimitiveArray(int8s);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported vector element type: " + elemType);
+ }
+ }
+
+ /**
+ * Returns a {@link Function} that converts a single {@link InternalRow} by
converting binary
+ * vector columns back to typed arrays and then applying the given
projection callback.
+ *
+ * <p>Ordinals in {@code vectorColumns} must be relative to {@code
readSchema} — the schema
+ * that has {@code BinaryType} for vector columns (as produced by
+ * {@link #replaceVectorColumnsWithBinary}).
+ *
+ * <p><b>Thread safety:</b> The returned function is NOT thread-safe; it
reuses a single
+ * {@link GenericInternalRow} buffer across calls. Each call to this factory
creates its own
+ * buffer, so separate functions returned by separate calls are independent.
+ *
+ * @param readSchema the Spark schema of incoming rows (BinaryType
for vector columns)
+ * @param vectorColumns map of ordinal → Vector schema for vector
columns, keyed by
+ * ordinals relative to {@code readSchema}
+ * @param projectionCallback called with the converted {@link
GenericInternalRow}; must copy
+ * any data it needs to retain (e.g. {@code
UnsafeProjection::apply})
+ * @return a function that converts one row and returns the projected result
+ */
+ public static Function<InternalRow, InternalRow> buildRowMapper(
+ StructType readSchema,
+ Map<Integer, HoodieSchema.Vector> vectorColumns,
+ Function<InternalRow, InternalRow> projectionCallback) {
+ GenericInternalRow converted = new
GenericInternalRow(readSchema.fields().length);
+ return row -> {
+ convertRowVectorColumns(row, converted, readSchema, vectorColumns);
+ return projectionCallback.apply(converted);
+ };
+ }
+
+ /**
+ * Converts vector columns in a row from binary (BinaryType) back to typed
arrays,
+ * copying non-vector columns as-is. The caller must supply a pre-allocated
+ * {@link GenericInternalRow} for reuse across iterations to reduce GC
pressure.
+ *
+ * @param row the source row (with BinaryType for vector columns)
+ * @param result a pre-allocated GenericInternalRow to write into
(reused across calls)
+ * @param readSchema the Spark schema of the source row (BinaryType for
vector columns)
+ * @param vectorColumns map of ordinal to Vector schema for vector columns
+ */
+ public static void convertRowVectorColumns(InternalRow row,
GenericInternalRow result,
+ StructType readSchema,
+ Map<Integer, HoodieSchema.Vector>
vectorColumns) {
+ int numFields = readSchema.fields().length;
+ for (int i = 0; i < numFields; i++) {
+ if (row.isNullAt(i)) {
+ result.setNullAt(i);
+ } else if (vectorColumns.containsKey(i)) {
+ result.update(i, convertBinaryToVectorArray(row.getBinary(i),
vectorColumns.get(i)));
+ } else {
+ // Non-vector column: copy value as-is using the read schema's data
type
+ result.update(i, row.get(i, readSchema.apply(i).dataType()));
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index be2ed64ece72..9ee2abaec93a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -68,6 +68,7 @@ import org.apache.spark.sql.types.YearMonthIntervalType;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.VersionUtils;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -163,6 +164,10 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
metadata.put("org.apache.spark.legacyDateTime", "");
metadata.put("org.apache.spark.timeZone",
SQLConf.get().sessionLocalTimeZone());
}
+ String vectorMeta = HoodieSchema.buildVectorColumnsMetadataValue(schema);
+ if (!vectorMeta.isEmpty()) {
+ metadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY,
vectorMeta);
+ }
Configuration configurationCopy = new Configuration(configuration);
configurationCopy.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
Boolean.toString(writeLegacyListFormat));
MessageType messageType = convert(structType, schema);
@@ -304,6 +309,43 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
}
recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLengthBytes,
0, numBytes));
};
+ } else if (dataType instanceof ArrayType
+ && resolvedSchema != null
+ && resolvedSchema.getType() == HoodieSchemaType.VECTOR) {
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) resolvedSchema;
+ int dimension = vectorSchema.getDimension();
+ HoodieSchema.Vector.VectorElementType elemType =
vectorSchema.getVectorElementType();
+ int bufferSize = Math.multiplyExact(dimension,
elemType.getElementSize());
+ // Buffer is reused across rows without copying.
Binary.fromReusedByteArray wraps the
+ // same backing array. This is safe because Parquet's ColumnWriteStoreV2
consumes the
+ // bytes before the next write call (same pattern as the decimal path
with decimalBuffer).
+ ByteBuffer buffer =
ByteBuffer.allocate(bufferSize).order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+ return (row, ordinal) -> {
+ ArrayData array = row.getArray(ordinal);
+ ValidationUtils.checkArgument(array.numElements() == dimension,
+ () -> String.format("Vector dimension mismatch: schema expects %d
elements but got %d", dimension, array.numElements()));
+ buffer.clear();
+ switch (elemType) {
+ case FLOAT:
+ for (int i = 0; i < dimension; i++) {
+ buffer.putFloat(array.getFloat(i));
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < dimension; i++) {
+ buffer.putDouble(array.getDouble(i));
+ }
+ break;
+ case INT8:
+ for (int i = 0; i < dimension; i++) {
+ buffer.put(array.getByte(i));
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported vector
element type: " + elemType);
+ }
+ recordConsumer.addBinary(Binary.fromReusedByteArray(buffer.array()));
+ };
} else if (dataType instanceof ArrayType) {
ValueWriter elementWriter = makeWriter(resolvedSchema == null ? null :
resolvedSchema.getElementType(), ((ArrayType) dataType).elementType());
if (!writeLegacyListFormat) {
@@ -489,6 +531,14 @@ public class HoodieRowParquetWriteSupport extends
WriteSupport<InternalRow> {
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.length(Decimal.minBytesForPrecision()[precision])
.named(structField.name());
+ } else if (dataType instanceof ArrayType
+ && resolvedSchema != null
+ && resolvedSchema.getType() == HoodieSchemaType.VECTOR) {
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) resolvedSchema;
+ int fixedSize = Math.multiplyExact(vectorSchema.getDimension(),
+ vectorSchema.getVectorElementType().getElementSize());
+ return Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+ .length(fixedSize).named(structField.name());
} else if (dataType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) dataType;
DataType elementType = arrayType.elementType();
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index e4d97a2df6a2..88c39656a918 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -30,18 +30,18 @@ import org.apache.hudi.common.table.HoodieTableConfig
import
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator,
ClosableIterator, Pair => HPair}
-import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory,
HoodieSparkParquetReader}
+import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory,
HoodieSparkParquetReader, VectorConversionUtils}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration,
StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator
import
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.{PartitionedFile,
SparkColumnarFileReader}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField,
StructType}
+import org.apache.spark.sql.types.{ArrayType, ByteType, DoubleType, FloatType,
LongType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import scala.collection.JavaConverters._
@@ -81,7 +81,17 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
assert(getRecordContext.supportsParquetRowIndex())
}
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
- val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType,
hasRowIndexField)
+
+ // Detect VECTOR columns and replace with BinaryType for the Parquet reader
+ // (Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY which Spark maps to
BinaryType)
+ val vectorColumnInfo =
SparkFileFormatInternalRowReaderContext.detectVectorColumns(requiredSchema)
+ val parquetReadStructType = if (vectorColumnInfo.nonEmpty) {
+
SparkFileFormatInternalRowReaderContext.replaceVectorColumnsWithBinary(structType,
vectorColumnInfo)
+ } else {
+ structType
+ }
+
+ val (readSchema, readFilters) =
getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField)
if (FSUtils.isLogFile(filePath)) {
// NOTE: now only primary key based filtering is supported for log files
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
@@ -100,9 +110,16 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
} else {
org.apache.hudi.common.util.Option.empty[org.apache.parquet.schema.MessageType]()
}
- new CloseableInternalRowIterator(baseFileReader.read(fileInfo,
+ val rawIterator = new
CloseableInternalRowIterator(baseFileReader.read(fileInfo,
readSchema, StructType(Seq.empty),
getSchemaHandler.getInternalSchemaOpt,
readFilters,
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]],
tableSchemaOpt))
+
+ // Post-process: convert binary VECTOR columns back to typed arrays
+ if (vectorColumnInfo.nonEmpty) {
+
SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(rawIterator,
vectorColumnInfo, readSchema)
+ } else {
+ rawIterator
+ }
}
}
@@ -290,4 +307,66 @@ object SparkFileFormatInternalRowReaderContext {
field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
}
+ /**
+ * Detects VECTOR columns from HoodieSchema.
+ * Delegates to [[VectorConversionUtils.detectVectorColumns]].
+ * @return Map of ordinal to Vector schema for VECTOR fields.
+ */
+ private[hudi] def detectVectorColumns(schema: HoodieSchema): Map[Int,
HoodieSchema.Vector] = {
+ VectorConversionUtils.detectVectorColumns(schema).asScala.map { case (k,
v) => (k.intValue(), v) }.toMap
+ }
+
+ /**
+ * Detects VECTOR columns from Spark StructType metadata.
+ * Delegates to [[VectorConversionUtils.detectVectorColumnsFromMetadata]].
+ * @return Map of ordinal to Vector schema for VECTOR fields.
+ */
+ def detectVectorColumnsFromMetadata(schema: StructType): Map[Int,
HoodieSchema.Vector] = {
+ VectorConversionUtils.detectVectorColumnsFromMetadata(schema).asScala.map
{ case (k, v) => (k.intValue(), v) }.toMap
+ }
+
+ /**
+ * Replaces ArrayType with BinaryType for VECTOR columns so the Parquet
reader
+ * can read FIXED_LEN_BYTE_ARRAY data without type mismatch.
+ * Delegates to [[VectorConversionUtils.replaceVectorColumnsWithBinary]].
+ */
+ def replaceVectorColumnsWithBinary(structType: StructType, vectorColumns:
Map[Int, HoodieSchema.Vector]): StructType = {
+ val javaMap = vectorColumns.map { case (k, v) => (Integer.valueOf(k),
v.asInstanceOf[AnyRef]) }.asJava
+ VectorConversionUtils.replaceVectorColumnsWithBinary(structType, javaMap)
+ }
+
+ /**
+ * Wraps an iterator to convert binary VECTOR columns back to typed arrays.
+ * Unpacks bytes from FIXED_LEN_BYTE_ARRAY into GenericArrayData using the
canonical vector byte order.
+ * Uses UnsafeProjection to make a defensive copy of each row.
+ */
+ private[hudi] def wrapWithVectorConversion(
+ iterator: ClosableIterator[InternalRow],
+ vectorColumns: Map[Int, HoodieSchema.Vector],
+ readSchema: StructType): ClosableIterator[InternalRow] = {
+ val javaVectorCols: java.util.Map[Integer, HoodieSchema.Vector] =
+ vectorColumns.map { case (k, v) => (Integer.valueOf(k), v) }.asJava
+ // Build output schema: replace BinaryType with the correct ArrayType for
vector columns
+ val outputFields = readSchema.fields.zipWithIndex.map { case (field, i) =>
+ vectorColumns.get(i) match {
+ case Some(vec) =>
+ val elemType = vec.getVectorElementType match {
+ case HoodieSchema.Vector.VectorElementType.FLOAT => FloatType
+ case HoodieSchema.Vector.VectorElementType.DOUBLE => DoubleType
+ case HoodieSchema.Vector.VectorElementType.INT8 => ByteType
+ }
+ field.copy(dataType = ArrayType(elemType, containsNull = false))
+ case None => field
+ }
+ }
+ val outputSchema = StructType(outputFields)
+ val projection = UnsafeProjection.create(outputSchema)
+ val mapper = VectorConversionUtils.buildRowMapper(readSchema,
javaVectorCols, projection.apply(_))
+ new ClosableIterator[InternalRow] {
+ override def hasNext: Boolean = iterator.hasNext
+ override def next(): InternalRow = mapper.apply(iterator.next())
+ override def close(): Unit = iterator.close()
+ }
+ }
+
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
index 03f9934f8b20..6b1934f5c4bf 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
@@ -20,18 +20,49 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.HoodieSparkUtils
import org.apache.parquet.hadoop.metadata.FileMetaData
+import org.apache.parquet.schema.{MessageType, PrimitiveType, Types}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.spark.sql.execution.datasources.SparkSchemaTransformUtils
import org.apache.spark.sql.types.{DataType, StructType}
+import scala.collection.JavaConverters._
+
object HoodieParquetFileFormatHelper {
def buildImplicitSchemaChangeInfo(hadoopConf: Configuration,
parquetFileMetaData: FileMetaData,
requiredSchema: StructType):
(java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType,
DataType]], StructType) = {
- // Convert Parquet schema to Spark StructType
+ val originalSchema = parquetFileMetaData.getSchema
+
+ // Spark 3.3's ParquetToSparkSchemaConverter throws "Illegal Parquet type:
FIXED_LEN_BYTE_ARRAY"
+ // for unannotated FLBA columns (e.g., Hudi VECTOR type). This was fixed
in Spark 3.4
+ // (SPARK-41096 / https://github.com/apache/spark/pull/38628) which maps
bare FLBA to BinaryType.
+ // On Spark 3.3 only, rewrite bare FLBA to BINARY before conversion.
+ val safeSchema = if (!HoodieSparkUtils.gteqSpark3_4) {
+ rewriteFixedLenByteArrayToBinary(originalSchema)
+ } else {
+ originalSchema
+ }
+
val convert = new ParquetToSparkSchemaConverter(hadoopConf)
- val fileStruct = convert.convert(parquetFileMetaData.getSchema)
+ val fileStruct = convert.convert(safeSchema)
SparkSchemaTransformUtils.buildImplicitSchemaChangeInfo(fileStruct,
requiredSchema)
}
+
+ /**
+ * Rewrites bare FIXED_LEN_BYTE_ARRAY columns (no logical type annotation)
to BINARY.
+ * Columns with annotations (e.g., DECIMAL, UUID) are left untouched.
+ */
+ private def rewriteFixedLenByteArrayToBinary(schema: MessageType):
MessageType = {
+ val fields = schema.getFields.asScala.map {
+ case pt: PrimitiveType
+ if pt.getPrimitiveTypeName == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ && pt.getLogicalTypeAnnotation == null =>
+ Types.primitive(PrimitiveTypeName.BINARY,
pt.getRepetition).named(pt.getName)
+ case other => other
+ }
+ new MessageType(schema.getName, fields.asJava)
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 4cf23ba17dc8..031fd210ab90 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -115,9 +115,9 @@ public class HoodieSchema implements Serializable {
if (params.isEmpty()) {
throw new IllegalArgumentException("VECTOR type descriptor must
include a dimension parameter");
}
- if (params.size() > 2) {
+ if (params.size() > 3) {
throw new IllegalArgumentException(
- "VECTOR type descriptor supports at most 2 parameters: dimension
and optional element type");
+ "VECTOR type descriptor supports at most 3 parameters:
dimension, optional element type, and optional storage backing");
}
int dimension;
try {
@@ -128,7 +128,10 @@ public class HoodieSchema implements Serializable {
Vector.VectorElementType elementType = params.size() > 1
? Vector.VectorElementType.fromString(params.get(1))
: Vector.VectorElementType.FLOAT;
- return createVector(dimension, elementType);
+ Vector.StorageBacking backing = params.size() > 2
+ ? Vector.StorageBacking.fromString(params.get(2))
+ : Vector.StorageBacking.FIXED_BYTES;
+ return createVector(dimension, elementType, backing);
case BLOB:
if (!params.isEmpty()) {
throw new IllegalArgumentException(
@@ -199,6 +202,42 @@ public class HoodieSchema implements Serializable {
public static final String PARQUET_ARRAY_SPARK = ".array";
public static final String PARQUET_ARRAY_AVRO = "." + ARRAY_LIST_ELEMENT;
+ /**
+ * Parquet file-footer metadata key under which VECTOR column names and type
descriptors
+ * are recorded. The value is a comma-separated list of {@code
colName:VECTOR(dim[,elemType])}
+ * entries, e.g. {@code "embedding:VECTOR(128),tags:VECTOR(64,INT8)"}.
+ *
+ * <p>Stored as file-level key-value metadata (Parquet footer) so that any
reader can
+ * identify vector columns without needing the Hudi schema store.
+ */
+ public static final String PARQUET_VECTOR_COLUMNS_METADATA_KEY =
"hoodie.vector.columns";
+
+ /**
+ * Builds the value string for {@link #PARQUET_VECTOR_COLUMNS_METADATA_KEY}.
+ *
+ * @param schema a HoodieSchema of type RECORD (or null)
+ * @return comma-separated {@code colName:VECTOR(dim[,elemType])} entries,
or empty string
+ * if the schema is null or has no VECTOR columns
+ */
+ public static String buildVectorColumnsMetadataValue(HoodieSchema schema) {
+ if (schema == null || schema.isSchemaNull()) {
+ return "";
+ }
+ List<HoodieSchemaField> fields = schema.getFields();
+ StringBuilder sb = new StringBuilder();
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema().getNonNullType();
+ if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+ Vector vectorSchema = (Vector) fieldSchema;
+ if (sb.length() > 0) {
+ sb.append(',');
+ }
+
sb.append(field.name()).append(':').append(vectorSchema.toTypeDescriptor());
+ }
+ }
+ return sb.toString();
+ }
+
private Schema avroSchema;
private HoodieSchemaType type;
private transient List<HoodieSchemaField> fields;
@@ -770,22 +809,50 @@ public class HoodieSchema implements Serializable {
* @return new HoodieSchema.Vector
*/
public static HoodieSchema.Vector createVector(int dimension,
Vector.VectorElementType elementType) {
+ return createVector(dimension, elementType,
Vector.StorageBacking.FIXED_BYTES);
+ }
+
+ /**
+ * Creates Vector schema with custom dimension, element type, and storage
backing.
+ *
+ * @param dimension vector dimension (must be > 0)
+ * @param elementType element type
+ * @param storageBacking physical storage format
+ * @return new HoodieSchema.Vector
+ */
+ public static HoodieSchema.Vector createVector(int dimension,
Vector.VectorElementType elementType,
+ Vector.StorageBacking
storageBacking) {
String vectorName = Vector.DEFAULT_NAME + "_" +
elementType.name().toLowerCase() + "_" + dimension;
- return createVector(vectorName, dimension, elementType);
+ return createVector(vectorName, dimension, elementType, storageBacking);
}
/**
* Creates Vector schema with custom name, dimension, and element type.
+ * Defaults to {@link Vector.StorageBacking#FIXED_BYTES}.
*
* @param name FIXED type name (must not be null or empty)
* @param dimension vector dimension (must be > 0)
- * @param elementType element type (use {@link
Vector.VectorElementType#FLOAT} or {@link Vector.VectorElementType#DOUBLE})
+ * @param elementType element type
* @return new HoodieSchema.Vector
*/
public static HoodieSchema.Vector createVector(String name, int dimension,
Vector.VectorElementType elementType) {
+ return createVector(name, dimension, elementType,
Vector.StorageBacking.FIXED_BYTES);
+ }
+
+ /**
+ * Creates Vector schema with custom name, dimension, element type, and
storage backing.
+ *
+ * @param name FIXED type name (must not be null or empty)
+ * @param dimension vector dimension (must be > 0)
+ * @param elementType element type
+ * @param storageBacking physical storage format
+ * @return new HoodieSchema.Vector
+ */
+ public static HoodieSchema.Vector createVector(String name, int dimension,
Vector.VectorElementType elementType,
+ Vector.StorageBacking
storageBacking) {
ValidationUtils.checkArgument(name != null && !name.isEmpty(),
() -> "Vector name must not be null or empty");
- Schema vectorSchema = Vector.createSchema(name, dimension, elementType);
+ Schema vectorSchema = Vector.createSchema(name, dimension, elementType,
storageBacking);
return new HoodieSchema.Vector(vectorSchema);
}
@@ -1820,10 +1887,14 @@ public class HoodieSchema implements Serializable {
* Default element type (FLOAT) is omitted.
*/
public String toTypeDescriptor() {
- if (getVectorElementType() == VectorElementType.FLOAT) {
+ boolean defaultElemType = getVectorElementType() ==
VectorElementType.FLOAT;
+ boolean defaultBacking = getStorageBacking() ==
StorageBacking.FIXED_BYTES;
+ if (defaultElemType && defaultBacking) {
return "VECTOR(" + getDimension() + ")";
+ } else if (defaultBacking) {
+ return "VECTOR(" + getDimension() + ", " + getVectorElementType() +
")";
}
- return "VECTOR(" + getDimension() + ", " + getVectorElementType() + ")";
+ return "VECTOR(" + getDimension() + ", " + getVectorElementType() + ", "
+ getStorageBacking() + ")";
}
/**
@@ -1835,11 +1906,17 @@ public class HoodieSchema implements Serializable {
* @return new Vector schema
*/
private static Schema createSchema(String name, int dimension,
VectorElementType elementType) {
+ return createSchema(name, dimension, elementType,
StorageBacking.FIXED_BYTES);
+ }
+
+ private static Schema createSchema(String name, int dimension,
VectorElementType elementType,
+ StorageBacking storageBacking) {
ValidationUtils.checkArgument(dimension > 0,
() -> "Vector dimension must be positive: " + dimension);
// Validate elementType
VectorElementType resolvedElementType = elementType != null ?
elementType : VectorElementType.FLOAT;
+ StorageBacking resolvedBacking = storageBacking != null ? storageBacking
: StorageBacking.FIXED_BYTES;
// Calculate fixed size: dimension × element size in bytes
int elementSize = resolvedElementType.getElementSize();
@@ -1849,7 +1926,7 @@ public class HoodieSchema implements Serializable {
Schema vectorSchema = Schema.createFixed(name, null, null, fixedSize);
// Apply logical type with properties directly to FIXED
- VectorLogicalType vectorLogicalType = new VectorLogicalType(dimension,
resolvedElementType.name(), StorageBacking.FIXED_BYTES.name());
+ VectorLogicalType vectorLogicalType = new VectorLogicalType(dimension,
resolvedElementType.name(), resolvedBacking.name());
vectorLogicalType.addToSchema(vectorSchema);
return vectorSchema;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
index e3f28165f0db..c38cf2286370 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
@@ -164,6 +164,8 @@ public class HoodieSchemaComparatorForSchemaEvolution {
return mapSchemaEquals(s1, s2);
case FIXED:
return fixedSchemaEquals(s1, s2);
+ case VECTOR:
+ return vectorSchemaEquals(s1, s2);
case UNION:
return unionSchemaEquals(s1, s2);
case STRING:
@@ -289,6 +291,14 @@ public class HoodieSchemaComparatorForSchemaEvolution {
return s1.getName().equals(s2.getName()) && s1.getFixedSize() ==
s2.getFixedSize();
}
+ private static boolean vectorSchemaEquals(HoodieSchema s1, HoodieSchema s2) {
+ HoodieSchema.Vector v1 = (HoodieSchema.Vector) s1;
+ HoodieSchema.Vector v2 = (HoodieSchema.Vector) s2;
+ return v1.getDimension() == v2.getDimension()
+ && v1.getVectorElementType() == v2.getVectorElementType()
+ && v1.getStorageBacking() == v2.getStorageBacking();
+ }
+
private static boolean decimalSchemaEquals(HoodieSchema s1, HoodieSchema s2)
{
HoodieSchema.Decimal d1 = (HoodieSchema.Decimal) s1;
HoodieSchema.Decimal d2 = (HoodieSchema.Decimal) s2;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
index 66935b6d1745..6897fc96ab1b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
@@ -310,6 +310,8 @@ public class HoodieSchemaCompatibilityChecker {
case FIXED:
result = result.mergedWith(checkSchemaNames(reader, writer,
locations));
return result.mergedWith(checkFixedSize(reader, writer,
locations));
+ case VECTOR:
+ return result.mergedWith(checkVectorCompatibility(reader, writer,
locations));
case DECIMAL:
return result.mergedWith(checkDecimalWidening(reader, writer,
locations));
case ENUM:
@@ -377,6 +379,7 @@ public class HoodieSchemaCompatibilityChecker {
case MAP:
return result.mergedWith(typeMismatch(reader, writer, locations));
case FIXED:
+ case VECTOR:
return result.mergedWith(typeMismatch(reader, writer, locations));
case ENUM:
return result.mergedWith(typeMismatch(reader, writer, locations));
@@ -457,6 +460,25 @@ public class HoodieSchemaCompatibilityChecker {
return checkDecimalWidening(reader, writer, locations);
}
+ // Convention: "expected" = writer (existing data), "found" = reader
(evolved schema).
+ // This matches checkFixedSize, checkDecimalWidening,
checkTimeCompatibility, etc.
+ private SchemaCompatibilityResult checkVectorCompatibility(final
HoodieSchema reader, final HoodieSchema writer,
+ final
Deque<LocationInfo> locations) {
+ HoodieSchema.Vector readerVector = (HoodieSchema.Vector) reader;
+ HoodieSchema.Vector writerVector = (HoodieSchema.Vector) writer;
+ if (readerVector.getDimension() != writerVector.getDimension()
+ || readerVector.getVectorElementType() !=
writerVector.getVectorElementType()
+ || readerVector.getStorageBacking() !=
writerVector.getStorageBacking()) {
+ String message = String.format("Vector field '%s' expected dimension:
%d, elementType: %s, storageBacking: %s, found: dimension: %d, elementType: %s,
storageBacking: %s",
+ getLocationName(locations, reader.getType()),
+ writerVector.getDimension(), writerVector.getVectorElementType(),
writerVector.getStorageBacking(),
+ readerVector.getDimension(), readerVector.getVectorElementType(),
readerVector.getStorageBacking());
+ return
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.TYPE_MISMATCH,
reader, writer,
+ message, asList(locations));
+ }
+ return SchemaCompatibilityResult.compatible();
+ }
+
private SchemaCompatibilityResult checkDecimalWidening(final HoodieSchema
reader, final HoodieSchema writer,
final
Deque<LocationInfo> locations) {
boolean isReaderDecimal = reader.getType() == HoodieSchemaType.DECIMAL;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
index da67b952258e..1d9ae8c1b686 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
@@ -73,7 +73,12 @@ public interface Type extends Serializable {
TIME_MILLIS(Integer.class),
TIMESTAMP_MILLIS(Long.class),
LOCAL_TIMESTAMP_MILLIS(Long.class),
- LOCAL_TIMESTAMP_MICROS(Long.class);
+ LOCAL_TIMESTAMP_MICROS(Long.class),
+ // ByteBuffer.class is a placeholder — classTag is currently unused in the
codebase.
+ // The actual storage format is determined by VectorType.storageBacking
(e.g.,
+ // PARQUET_FIXED_LEN_BYTE_ARRAY). If new backing types are added, this
class tag
+ // may need to become backing-dependent or remain unused.
+ VECTOR(ByteBuffer.class);
private final String name;
private final Class<?> classTag;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
index 86ec29da9c45..aaac7b51928d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
@@ -18,6 +18,7 @@
package org.apache.hudi.internal.schema;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.internal.schema.Type.NestedType;
import org.apache.hudi.internal.schema.Type.PrimitiveType;
@@ -298,6 +299,75 @@ public class Types {
}
}
+ /**
+ * Vector type that preserves dimension, element type, and storage backing
+ * through InternalSchema round-trips.
+ *
+ * <p>This class is part of the InternalSchema type system (separate from
HoodieSchema)
+ * and follows the same pattern as {@link FixedType}, {@link
DecimalTypeFixed}, etc.
+ * It cannot be replaced with {@code HoodieSchema.Vector} because they
belong to
+ * different type hierarchies.
+ */
+ public static class VectorType extends PrimitiveType {
+ private final int dimension;
+ private final String elementType;
+ private final String storageBacking;
+
+ public static VectorType get(int dimension, String elementType, String
storageBacking) {
+ return new VectorType(dimension, elementType, storageBacking);
+ }
+
+ private VectorType(int dimension, String elementType, String
storageBacking) {
+ // Validate that the strings correspond to known enum values to fail
fast on typos
+ HoodieSchema.Vector.VectorElementType.fromString(elementType);
+ HoodieSchema.Vector.StorageBacking.fromString(storageBacking);
+ this.dimension = dimension;
+ this.elementType = elementType;
+ this.storageBacking = storageBacking;
+ }
+
+ public int getDimension() {
+ return dimension;
+ }
+
+ public String getElementType() {
+ return elementType;
+ }
+
+ public String getStorageBacking() {
+ return storageBacking;
+ }
+
+ @Override
+ public TypeID typeId() {
+ return TypeID.VECTOR;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("vector[%d, %s, %s]", dimension, elementType,
storageBacking);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (!(o instanceof VectorType)) {
+ return false;
+ }
+
+ VectorType that = (VectorType) o;
+ return dimension == that.dimension
+ && Objects.equals(elementType, that.elementType)
+ && Objects.equals(storageBacking, that.storageBacking);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(VectorType.class, dimension, elementType,
storageBacking);
+ }
+ }
+
public abstract static class DecimalBase extends PrimitiveType {
protected final int scale;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
index 9783ecac1b5a..c3bbedf319d0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
@@ -314,6 +314,12 @@ public class InternalSchemaConverter {
return Types.StringType.get();
case FIXED:
return Types.FixedType.getFixed(schema.getFixedSize());
+ case VECTOR:
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+ return Types.VectorType.get(
+ vectorSchema.getDimension(),
+ vectorSchema.getVectorElementType().name(),
+ vectorSchema.getStorageBacking().name());
case BYTES:
return Types.BinaryType.get();
case UUID:
@@ -538,6 +544,14 @@ public class InternalSchemaConverter {
return HoodieSchema.createFixed(name, null, null,
fixed.getFixedSize());
}
+ case VECTOR: {
+ Types.VectorType vector = (Types.VectorType) primitive;
+ return HoodieSchema.createVector(
+ vector.getDimension(),
+
HoodieSchema.Vector.VectorElementType.fromString(vector.getElementType()),
+
HoodieSchema.Vector.StorageBacking.fromString(vector.getStorageBacking()));
+ }
+
case DECIMAL:
case DECIMAL_FIXED: {
Types.DecimalTypeFixed decimal = (Types.DecimalTypeFixed) primitive;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index bfa0c803fe64..2babb429260e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -284,11 +284,14 @@ public class HoodieTableMetadataUtil {
String fieldName = fieldNameFieldPair.getKey();
HoodieSchemaField field = fieldNameFieldPair.getValue();
HoodieSchema fieldSchema = field.schema().getNonNullType();
+ if (!isColumnTypeSupported(fieldSchema,
Option.of(record.getRecordType()), indexVersion)) {
+ return;
+ }
ColumnStats colStats = allColumnStats.computeIfAbsent(fieldName,
ignored -> new ColumnStats(getValueMetadata(fieldSchema, indexVersion)));
Object fieldValue = collectColumnRangeFieldValue(record,
colStats.valueMetadata, fieldName, fieldSchema, recordSchema, properties);
colStats.valueCount++;
- if (fieldValue != null && isColumnTypeSupported(fieldSchema,
Option.of(record.getRecordType()), indexVersion)) {
+ if (fieldValue != null) {
// Set the min value of the field
if (colStats.minValue == null
|| ConvertingGenericData.INSTANCE.compare(fieldValue,
colStats.minValue, fieldSchema.toAvroSchema()) < 0) {
@@ -2056,7 +2059,7 @@ public class HoodieTableMetadataUtil {
// Check for precision and scale if the schema has a logical decimal type.
return type != HoodieSchemaType.RECORD && type != HoodieSchemaType.MAP
&& type != HoodieSchemaType.ARRAY && type != HoodieSchemaType.ENUM
- && type != HoodieSchemaType.BLOB;
+ && type != HoodieSchemaType.BLOB && type != HoodieSchemaType.VECTOR;
}
public static Set<String> getInflightMetadataPartitions(HoodieTableConfig
tableConfig) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index 1032e228034d..1bf7d1621071 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -2272,6 +2272,18 @@ public class TestHoodieSchema {
HoodieSchema.Vector vector = (HoodieSchema.Vector) parsed;
assertEquals(512, vector.getDimension());
assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE,
vector.getVectorElementType());
+ assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES,
vector.getStorageBacking());
+ }
+
+ @Test
+ public void testParseTypeDescriptorVectorWithStorageBacking() {
+ // Explicit storageBacking as 3rd param
+ HoodieSchema parsed = HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT,
FIXED_BYTES)");
+ assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+ HoodieSchema.Vector vector = (HoodieSchema.Vector) parsed;
+ assertEquals(128, vector.getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT,
vector.getVectorElementType());
+ assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES,
vector.getStorageBacking());
}
@Test
@@ -2289,6 +2301,10 @@ public class TestHoodieSchema {
assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
assertEquals(256, parsedVector.getDimension());
assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT,
parsedVector.getVectorElementType());
+ assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES,
parsedVector.getStorageBacking());
+
+ // Default backing should not appear in descriptor string
+ assertFalse(typeString.contains("FIXED_BYTES"), "Default storageBacking
should be omitted from descriptor");
// Non-default element type round-trip
HoodieSchema.Vector vectorDouble = HoodieSchema.createVector(64,
HoodieSchema.Vector.VectorElementType.DOUBLE);
@@ -2299,6 +2315,7 @@ public class TestHoodieSchema {
assertEquals(HoodieSchemaType.VECTOR, parsedDouble.getType());
assertEquals(64, parsedDoubleVector.getDimension());
assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE,
parsedDoubleVector.getVectorElementType());
+ assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES,
parsedDoubleVector.getStorageBacking());
}
@Test
@@ -2318,12 +2335,15 @@ public class TestHoodieSchema {
assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR"));
assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR()"));
- // Too many parameters
- assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, extra)"));
+ // Too many parameters (4+)
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, FIXED_BYTES, extra)"));
// Invalid element type
assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(128, INVALID)"));
+ // Invalid storage backing
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, UNKNOWN_BACKING)"));
+
// Zero and negative dimensions
assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(0)"));
assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parseTypeDescriptor("VECTOR(-1)"));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
index 29d53f0c146c..09724cc54e91 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
@@ -611,4 +611,22 @@ class TestHoodieSchemaComparatorForSchemaEvolution {
// BLOB with fields in different order should not be equal (order matters)
assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(blob1,
blobWithDifferentOrder));
}
+
+ @Test
+ void testVectorSchemaEquality() {
+ assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+ HoodieSchema.createVector(16,
HoodieSchema.Vector.VectorElementType.FLOAT),
+ HoodieSchema.createVector(16,
HoodieSchema.Vector.VectorElementType.FLOAT)
+ ));
+
+ assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+ HoodieSchema.createVector(16,
HoodieSchema.Vector.VectorElementType.FLOAT),
+ HoodieSchema.createVector(16,
HoodieSchema.Vector.VectorElementType.DOUBLE)
+ ));
+
+ assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+ HoodieSchema.createVector(16,
HoodieSchema.Vector.VectorElementType.FLOAT),
+ HoodieSchema.createVector(32,
HoodieSchema.Vector.VectorElementType.FLOAT)
+ ));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
index 6ff89d6e8586..f3313db6ffd8 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.schema.TestHoodieSchemaUtils.EVOLVED_SCHEMA;
import static
org.apache.hudi.common.schema.TestHoodieSchemaUtils.SIMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -814,4 +815,41 @@ public class TestHoodieSchemaCompatibility {
HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
assertFalse(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(blob1,
stringSchema));
}
+
+ @Test
+ public void testVectorSchemaCompatibility() {
+ HoodieSchema incomingSchema = HoodieSchema.createRecord("vector_record",
null, null, Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(4,
HoodieSchema.Vector.VectorElementType.DOUBLE), null, null)
+ ));
+
+ HoodieSchema tableSchema = HoodieSchema.createRecord("vector_record",
null, null, Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(4,
HoodieSchema.Vector.VectorElementType.DOUBLE), null, null)
+ ));
+
+ HoodieSchemaCompatibilityChecker.SchemaPairCompatibility compatibility =
+
HoodieSchemaCompatibilityChecker.checkReaderWriterCompatibility(incomingSchema,
tableSchema, false);
+
assertEquals(HoodieSchemaCompatibilityChecker.SchemaCompatibilityType.COMPATIBLE,
compatibility.getType());
+ assertDoesNotThrow(() ->
HoodieSchemaCompatibility.checkValidEvolution(incomingSchema, tableSchema));
+ }
+
+ @Test
+ public void testVectorSchemaCompatibilityRejectsElementTypeEvolution() {
+ HoodieSchema incomingSchema = HoodieSchema.createRecord("vector_record",
null, null, Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(4,
HoodieSchema.Vector.VectorElementType.DOUBLE), null, null)
+ ));
+
+ HoodieSchema tableSchema = HoodieSchema.createRecord("vector_record",
null, null, Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("embedding", HoodieSchema.createVector(4,
HoodieSchema.Vector.VectorElementType.FLOAT), null, null)
+ ));
+
+ HoodieSchemaCompatibilityChecker.SchemaPairCompatibility compatibility =
+
HoodieSchemaCompatibilityChecker.checkReaderWriterCompatibility(incomingSchema,
tableSchema, false);
+
assertEquals(HoodieSchemaCompatibilityChecker.SchemaCompatibilityType.INCOMPATIBLE,
compatibility.getType());
+ assertThrows(SchemaBackwardsCompatibilityException.class,
+ () -> HoodieSchemaCompatibility.checkValidEvolution(incomingSchema,
tableSchema));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/convert/TestInternalSchemaConverter.java
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/convert/TestInternalSchemaConverter.java
index 1ec7927813f0..44221d229b7f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/convert/TestInternalSchemaConverter.java
+++
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/convert/TestInternalSchemaConverter.java
@@ -22,6 +22,8 @@ package org.apache.hudi.internal.schema.convert;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
import org.junit.jupiter.api.Test;
@@ -111,4 +113,60 @@ public class TestInternalSchemaConverter {
assertEquals(expectedOutput.size(), fieldNames.size());
assertTrue(fieldNames.containsAll(expectedOutput));
}
+
+ @Test
+ public void testVectorTypeRoundTrip() {
+ // Create a HoodieSchema with a VECTOR field
+ HoodieSchema vectorField = HoodieSchema.createVector(128,
HoodieSchema.Vector.VectorElementType.FLOAT);
+ HoodieSchema recordSchema = HoodieSchema.createRecord("vectorRecord",
null, null, Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("embedding", vectorField, null, null)
+ ));
+
+ // HoodieSchema → InternalSchema
+ InternalSchema internalSchema =
InternalSchemaConverter.convert(recordSchema);
+
+ // Verify the InternalSchema has a VectorType
+ Types.VectorType internalVectorType = (Types.VectorType)
internalSchema.findType("embedding");
+ assertEquals(128, internalVectorType.getDimension());
+ assertEquals("FLOAT", internalVectorType.getElementType());
+ assertEquals("FIXED_BYTES", internalVectorType.getStorageBacking());
+
+ // InternalSchema → HoodieSchema
+ HoodieSchema roundTripped =
InternalSchemaConverter.convert(internalSchema, "vectorRecord");
+
+ // Verify vector properties survived the round-trip
+ HoodieSchemaField embeddingField = roundTripped.getFields().stream()
+ .filter(f -> f.name().equals("embedding"))
+ .findFirst().get();
+ HoodieSchema embeddingSchema = embeddingField.schema().getNonNullType();
+ assertEquals(HoodieSchemaType.VECTOR, embeddingSchema.getType());
+ HoodieSchema.Vector roundTrippedVector = (HoodieSchema.Vector)
embeddingSchema;
+ assertEquals(128, roundTrippedVector.getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT,
roundTrippedVector.getVectorElementType());
+ assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES,
roundTrippedVector.getStorageBacking());
+ }
+
+ @Test
+ public void testVectorTypeRoundTripDouble() {
+ HoodieSchema vectorField = HoodieSchema.createVector(64,
HoodieSchema.Vector.VectorElementType.DOUBLE);
+ HoodieSchema recordSchema =
HoodieSchema.createRecord("vectorDoubleRecord", null, null, Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT),
null, null),
+ HoodieSchemaField.of("vec", vectorField, null, null)
+ ));
+
+ InternalSchema internalSchema =
InternalSchemaConverter.convert(recordSchema);
+ Types.VectorType internalVec = (Types.VectorType)
internalSchema.findType("vec");
+ assertEquals(64, internalVec.getDimension());
+ assertEquals("DOUBLE", internalVec.getElementType());
+
+ HoodieSchema roundTripped =
InternalSchemaConverter.convert(internalSchema, "vectorDoubleRecord");
+ HoodieSchemaField vecField = roundTripped.getFields().stream()
+ .filter(f -> f.name().equals("vec"))
+ .findFirst().get();
+ HoodieSchema.Vector rtVec = (HoodieSchema.Vector)
vecField.schema().getNonNullType();
+ assertEquals(64, rtVec.getDimension());
+ assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE,
rtVec.getVectorElementType());
+ assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES,
rtVec.getStorageBacking());
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 171bd9231557..b0093315306d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -322,4 +322,13 @@ class TestHoodieTableMetadataUtil {
assertFalse(HoodieTableMetadataUtil.isTimestampMillisField(stringSchema),
"Should return false for string");
}
+
+ @Test
+ void testVectorColumnsAreNotSupportedForV2ColumnStats() {
+ HoodieSchema vectorSchema =
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+ HoodieSchema stringSchema =
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+
+ assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(vectorSchema,
Option.empty(), HoodieIndexVersion.V2));
+ assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(stringSchema,
Option.empty(), HoodieIndexVersion.V2));
+ }
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index 97428a6819e4..547b02787369 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -48,6 +48,10 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
this.properties = properties;
+ String vectorMeta =
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
+ if (!vectorMeta.isEmpty()) {
+ footerMetadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY,
vectorMeta);
+ }
}
@Override
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
index 4f8d88d0f6d9..010a0b7055ae 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
@@ -248,6 +248,18 @@ public class AvroSchemaConverterWithTimestampNTZ extends
HoodieAvroParquetSchema
builder = Types.primitive(FIXED_LEN_BYTE_ARRAY,
repetition).length(schema.getFixedSize());
}
break;
+ case VECTOR:
+ // Vectors are stored as bare FIXED_LEN_BYTE_ARRAY without a Parquet
logical type annotation.
+ // Vector semantics (dimension, element type) are resolved from
HoodieSchema (the table's
+ // stored schema), not from the Parquet file schema. The reverse
direction
+ // (FIXED_LEN_BYTE_ARRAY → HoodieSchema) currently maps to generic
FIXED; this is
+ // acceptable because the read path detects vectors from the
HoodieSchema, not from Parquet.
+ // TODO: Consider adding VectorLogicalTypeAnnotation for fully
self-describing Parquet files.
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+ int fixedSize = vectorSchema.getDimension()
+ * vectorSchema.getVectorElementType().getElementSize();
+ builder = Types.primitive(FIXED_LEN_BYTE_ARRAY,
repetition).length(fixedSize);
+ break;
case UNION:
return convertUnion(fieldName, schema, repetition, schemaPath);
default:
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 8d83c9f4223a..9ba25424a9ca 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -18,13 +18,13 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hudi.{HoodieFileIndex, HoodiePartitionCDCFileGroupMapping,
HoodiePartitionFileSliceMapping, HoodieSchemaConversionUtils, HoodieSparkUtils,
HoodieTableSchema, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
-import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.cdc.{CDCFileGroupIterator, HoodieCDCFileGroupSplit,
HoodieCDCFileIndex}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.schema.HoodieSchemaUtils
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
ParquetTableSchemaResolver}
import org.apache.hudi.common.table.read.HoodieFileGroupReader
@@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.io.IOUtils
import
org.apache.hudi.io.storage.HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR
+import org.apache.hudi.io.storage.VectorConversionUtils
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
@@ -47,7 +48,7 @@ import org.apache.spark.internal.Logging
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory,
PartitionedFile, SparkColumnarFileReader}
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector,
OnHeapColumnVector}
@@ -116,6 +117,22 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
*/
private var supportReturningBatch = false
+ /**
+ * Cached result of vector column detection keyed by schema identity.
+ * Avoids re-parsing metadata on repeated supportBatch / readBaseFile calls
with the same schema.
+ */
+ @transient private var cachedVectorDetection: (StructType, Map[Int,
HoodieSchema.Vector]) = _
+
+ private def detectVectorColumnsCached(schema: StructType): Map[Int,
HoodieSchema.Vector] = {
+ if (cachedVectorDetection != null && (cachedVectorDetection._1 eq schema))
{
+ cachedVectorDetection._2
+ } else {
+ val result = detectVectorColumns(schema)
+ cachedVectorDetection = (schema, result)
+ result
+ }
+ }
+
/**
* Checks if the file format supports vectorized reading, please refer to
SPARK-40918.
*
@@ -128,30 +145,38 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
*
*/
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
- val conf = sparkSession.sessionState.conf
- val parquetBatchSupported =
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) &&
supportBatchWithTableSchema
- val orcBatchSupported = conf.orcVectorizedReaderEnabled &&
- schema.forall(s => OrcUtils.supportColumnarReads(
- s.dataType,
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
- // TODO: Implement columnar batch reading
https://github.com/apache/hudi/issues/17736
- val lanceBatchSupported = false
-
- val supportBatch = if (isMultipleBaseFileFormatsEnabled) {
- parquetBatchSupported && orcBatchSupported
- } else if (hoodieFileFormat == HoodieFileFormat.PARQUET) {
- parquetBatchSupported
- } else if (hoodieFileFormat == HoodieFileFormat.ORC) {
- orcBatchSupported
- } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
- lanceBatchSupported
+ // Vector columns are stored as FIXED_LEN_BYTE_ARRAY in Parquet but read
as ArrayType in Spark.
+ // The binary→array conversion requires row-level access, so disable
vectorized batch reading.
+ if (detectVectorColumnsCached(schema).nonEmpty) {
+ supportVectorizedRead = false
+ supportReturningBatch = false
+ false
} else {
- throw new HoodieNotSupportedException("Unsupported file format: " +
hoodieFileFormat)
+ val conf = sparkSession.sessionState.conf
+ val parquetBatchSupported =
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) &&
supportBatchWithTableSchema
+ val orcBatchSupported = conf.orcVectorizedReaderEnabled &&
+ schema.forall(s => OrcUtils.supportColumnarReads(
+ s.dataType,
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
+ // TODO: Implement columnar batch reading
https://github.com/apache/hudi/issues/17736
+ val lanceBatchSupported = false
+
+ val supportBatch = if (isMultipleBaseFileFormatsEnabled) {
+ parquetBatchSupported && orcBatchSupported
+ } else if (hoodieFileFormat == HoodieFileFormat.PARQUET) {
+ parquetBatchSupported
+ } else if (hoodieFileFormat == HoodieFileFormat.ORC) {
+ orcBatchSupported
+ } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
+ lanceBatchSupported
+ } else {
+ throw new HoodieNotSupportedException("Unsupported file format: " +
hoodieFileFormat)
+ }
+ supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch
+ supportReturningBatch = !isMOR && supportVectorizedRead
+ logInfo(s"supportReturningBatch: $supportReturningBatch,
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, "
+
+ s"isBootstrap: $isBootstrap, superSupportBatch: $supportBatch")
+ supportReturningBatch
}
- supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch
- supportReturningBatch = !isMOR && supportVectorizedRead
- logInfo(s"supportReturningBatch: $supportReturningBatch,
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, "
+
- s"isBootstrap: $isBootstrap, superSupportBatch: $supportBatch")
- supportReturningBatch
}
//for partition columns that we read from the file, we don't want them to be
constant column vectors so we
@@ -398,21 +423,60 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
}
}
+ private def detectVectorColumns(schema: StructType): Map[Int,
HoodieSchema.Vector] =
+
SparkFileFormatInternalRowReaderContext.detectVectorColumnsFromMetadata(schema)
+
+ private def replaceVectorFieldsWithBinary(schema: StructType, vectorCols:
Map[Int, HoodieSchema.Vector]): StructType =
+
SparkFileFormatInternalRowReaderContext.replaceVectorColumnsWithBinary(schema,
vectorCols)
+
+ /**
+ * Detects vector columns and replaces them with BinaryType in one step.
+ * @return (modified schema with BinaryType for vectors, vector column
ordinal map)
+ */
+ private def withVectorRewrite(schema: StructType): (StructType, Map[Int,
HoodieSchema.Vector]) = {
+ val vecs = detectVectorColumns(schema)
+ if (vecs.nonEmpty) (replaceVectorFieldsWithBinary(schema, vecs), vecs)
else (schema, vecs)
+ }
+
+ /**
+ * Wraps an iterator to convert binary VECTOR columns back to typed arrays.
+ * The read schema has BinaryType for vector columns; the target schema has
ArrayType.
+ */
+ private def wrapWithVectorConversion(iter: Iterator[InternalRow],
+ readSchema: StructType,
+ targetSchema: StructType,
+ vectorCols: Map[Int,
HoodieSchema.Vector]): Iterator[InternalRow] = {
+ val vectorProjection = UnsafeProjection.create(targetSchema)
+ val javaVectorCols: java.util.Map[Integer, HoodieSchema.Vector] =
+ vectorCols.map { case (k, v) => (Integer.valueOf(k), v) }.asJava
+ val mapper = VectorConversionUtils.buildRowMapper(readSchema,
javaVectorCols, vectorProjection.apply(_))
+ iter.map(mapper.apply(_))
+ }
+
// executor
private def readBaseFile(file: PartitionedFile, parquetFileReader:
SparkColumnarFileReader, requestedSchema: StructType,
remainingPartitionSchema: StructType,
fixedPartitionIndexes: Set[Int], requiredSchema: StructType,
partitionSchema: StructType, outputSchema:
StructType, filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]):
Iterator[InternalRow] = {
- if (remainingPartitionSchema.fields.length ==
partitionSchema.fields.length) {
+ // Detect vector columns and create modified schemas with BinaryType.
+ // Each schema is detected independently because ordinals are relative to
the schema being
+ // modified — outputSchema and requestedSchema may have vector columns at
different positions
+ // than requiredSchema (e.g. when partition columns are interleaved).
+ val (modifiedRequiredSchema, vectorCols) =
withVectorRewrite(requiredSchema)
+ val hasVectors = vectorCols.nonEmpty
+ val (modifiedOutputSchema, outputVectorCols) = if (hasVectors)
withVectorRewrite(outputSchema) else (outputSchema, Map.empty[Int,
HoodieSchema.Vector])
+ val (modifiedRequestedSchema, _) = if (hasVectors)
withVectorRewrite(requestedSchema) else (requestedSchema, Map.empty[Int,
HoodieSchema.Vector])
+
+ val rawIter = if (remainingPartitionSchema.fields.length ==
partitionSchema.fields.length) {
//none of partition fields are read from the file, so the reader will do
the appending for us
- parquetFileReader.read(file, requiredSchema, partitionSchema,
internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType)
+ parquetFileReader.read(file, modifiedRequiredSchema, partitionSchema,
internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType)
} else if (remainingPartitionSchema.fields.length == 0) {
//we read all of the partition fields from the file
val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
//we need to modify the partitioned file so that the partition values
are empty
val modifiedFile = pfileUtils.createPartitionedFile(InternalRow.empty,
pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
//and we pass an empty schema for the partition schema
- parquetFileReader.read(modifiedFile, outputSchema, new StructType(),
internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType)
+ parquetFileReader.read(modifiedFile, modifiedOutputSchema, new
StructType(), internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType)
} else {
//need to do an additional projection here. The case in mind is that
partition schema is "a,b,c" mandatoryFields is "a,c",
//then we will read (dataSchema + a + c) and append b. So the final
schema will be (data schema + a + c +b)
@@ -420,8 +484,20 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
val partitionValues = getFixedPartitionValues(file.partitionValues,
partitionSchema, fixedPartitionIndexes)
val modifiedFile = pfileUtils.createPartitionedFile(partitionValues,
pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
- val iter = parquetFileReader.read(modifiedFile, requestedSchema,
remainingPartitionSchema, internalSchemaOpt, filters, storageConf,
tableSchemaAsMessageType)
- projectIter(iter, StructType(requestedSchema.fields ++
remainingPartitionSchema.fields), outputSchema)
+ val iter = parquetFileReader.read(modifiedFile, modifiedRequestedSchema,
remainingPartitionSchema, internalSchemaOpt, filters, storageConf,
tableSchemaAsMessageType)
+ projectIter(iter, StructType(modifiedRequestedSchema.fields ++
remainingPartitionSchema.fields), modifiedOutputSchema)
+ }
+
+ if (hasVectors) {
+ // The raw iterator has BinaryType for vector columns; convert back to
ArrayType
+ val readSchema = if (remainingPartitionSchema.fields.length ==
partitionSchema.fields.length) {
+ StructType(modifiedRequiredSchema.fields ++ partitionSchema.fields)
+ } else {
+ modifiedOutputSchema
+ }
+ wrapWithVectorConversion(rawIter, readSchema, outputSchema,
outputVectorCols)
+ } else {
+ rawIter
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
new file mode 100644
index 000000000000..0522950f0d9f
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.util.HadoopInputFile
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions._
+
+import scala.collection.JavaConverters._
+
+/**
+ * End-to-end tests for vector column support in Hudi.
+ * Tests round-trip data correctness through Spark DataFrames.
+ */
+class TestVectorDataSource extends HoodieSparkClientTestBase {
+
+ var spark: SparkSession = null
+
+ @BeforeEach override def setUp(): Unit = {
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initHoodieStorage()
+ }
+
+ @AfterEach override def tearDown(): Unit = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ private def vectorMetadata(descriptor: String): Metadata =
+ new MetadataBuilder().putString(HoodieSchema.TYPE_METADATA_FIELD,
descriptor).build()
+
+ private def createVectorDf(schema: StructType, data: Seq[Row]): DataFrame =
+ spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+ private def writeHudiTable(df: DataFrame, tableName: String, path: String,
+ tableType: String = "COPY_ON_WRITE", precombineField: String = "id",
+ mode: SaveMode = SaveMode.Overwrite, extraOpts: Map[String, String] =
Map.empty): Unit = {
+ var writer = df.write.format("hudi")
+ .option(RECORDKEY_FIELD.key, "id")
+ .option(PRECOMBINE_FIELD.key, precombineField)
+ .option(TABLE_NAME.key, tableName)
+ .option(TABLE_TYPE.key, tableType)
+ extraOpts.foreach { case (k, v) => writer = writer.option(k, v) }
+ writer.mode(mode).save(path)
+ }
+
+ private def readHudiTable(path: String): DataFrame =
+ spark.read.format("hudi").load(path)
+
+ @Test
+ def testVectorRoundTrip(): Unit = {
+ // 1. Create schema with vector metadata
+ val metadata = vectorMetadata("VECTOR(128)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata),
+ StructField("label", StringType, nullable = true)
+ ))
+
+ // 2. Generate test data (128-dim float vectors)
+ val random = new scala.util.Random(42)
+ val data = (0 until 100).map { i =>
+ val embedding = Array.fill(128)(random.nextFloat())
+ Row(s"key_$i", embedding.toSeq, s"label_$i")
+ }
+
+ val df = createVectorDf(schema, data)
+
+ // 3. Write as COW Hudi table
+ writeHudiTable(df, "vector_test_table", basePath)
+
+ // 4. Read back
+ val readDf = readHudiTable(basePath)
+
+ // 5. Verify row count
+ assertEquals(100, readDf.count())
+
+ // 6. Verify schema preserved
+ val embeddingField = readDf.schema("embedding")
+ assertTrue(embeddingField.dataType.isInstanceOf[ArrayType])
+ val arrayType = embeddingField.dataType.asInstanceOf[ArrayType]
+ assertEquals(FloatType, arrayType.elementType)
+ assertFalse(arrayType.containsNull)
+
+ // 7. Verify vector metadata preserved
+ val readMetadata = embeddingField.metadata
+ assertTrue(readMetadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ val parsedSchema = HoodieSchema.parseTypeDescriptor(
+ readMetadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.VECTOR, parsedSchema.getType)
+ val vectorSchema = parsedSchema.asInstanceOf[HoodieSchema.Vector]
+ assertEquals(128, vectorSchema.getDimension)
+
+ // 8. Verify float values match exactly
+ val originalRows = df.select("id", "embedding").collect()
+ .map(r => (r.getString(0), r.getSeq[Float](1)))
+ .toMap
+
+ val readRows = readDf.select("id", "embedding").collect()
+ .map(r => (r.getString(0), r.getSeq[Float](1)))
+ .toMap
+
+ originalRows.foreach { case (id, origEmbedding) =>
+ val readEmbedding = readRows(id)
+ assertEquals(128, readEmbedding.size, s"Vector size mismatch for $id")
+
+ origEmbedding.zip(readEmbedding).zipWithIndex.foreach {
+ case ((orig, read), idx) =>
+ assertEquals(orig, read, 1e-9f,
+ s"Vector mismatch at $id index $idx: orig=$orig read=$read")
+ }
+ }
+ }
+
+
+ @Test
+ def testNullableVectorField(): Unit = {
+ // Vector column itself nullable (entire array can be null)
+ val metadata = vectorMetadata("VECTOR(32)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = true, metadata) // nullable = true
+ ))
+
+ val data = Seq(
+ Row("key_1", Array.fill(32)(0.5f).toSeq),
+ Row("key_2", null), // null vector
+ Row("key_3", Array.fill(32)(1.0f).toSeq)
+ )
+
+ val df = createVectorDf(schema, data)
+
+ writeHudiTable(df, "nullable_vector_test", basePath + "/nullable")
+
+ val readDf = readHudiTable(basePath + "/nullable")
+ val readRows = readDf.select("id", "embedding").collect()
+
+ // Verify null handling
+ val key2Row = readRows.find(_.getString(0) == "key_2").get
+ assertTrue(key2Row.isNullAt(1), "Null vector not preserved")
+
+ // Verify non-null vectors preserved correctly
+ val key1Row = readRows.find(_.getString(0) == "key_1").get
+ assertFalse(key1Row.isNullAt(1))
+ val key1Embedding = key1Row.getSeq[Float](1)
+ assertEquals(32, key1Embedding.size)
+ assertTrue(key1Embedding.forall(_ == 0.5f))
+
+ val key3Row = readRows.find(_.getString(0) == "key_3").get
+ assertFalse(key3Row.isNullAt(1))
+ val key3Embedding = key3Row.getSeq[Float](1)
+ assertEquals(32, key3Embedding.size)
+ assertTrue(key3Embedding.forall(_ == 1.0f))
+ }
+
+ @Test
+ def testColumnProjectionWithVector(): Unit = {
+ val metadata = vectorMetadata("VECTOR(16)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata),
+ StructField("label", StringType, nullable = true),
+ StructField("score", IntegerType, nullable = true)
+ ))
+
+ val data = (0 until 10).map { i =>
+ Row(s"key_$i", Array.fill(16)(i.toFloat).toSeq, s"label_$i", i * 10)
+ }
+
+ val df = createVectorDf(schema, data)
+
+ writeHudiTable(df, "projection_vector_test", basePath + "/projection")
+
+ // Read only non-vector columns (vector column excluded)
+ val nonVectorDf = readHudiTable(basePath + "/projection")
+ .select("id", "label", "score")
+ assertEquals(10, nonVectorDf.count())
+ val row0 = nonVectorDf.filter("id = 'key_0'").collect()(0)
+ assertEquals("label_0", row0.getString(1))
+ assertEquals(0, row0.getInt(2))
+
+ // Read only the vector column with id
+ val vectorOnlyDf = readHudiTable(basePath + "/projection")
+ .select("id", "embedding")
+ assertEquals(10, vectorOnlyDf.count())
+ val vecRow = vectorOnlyDf.filter("id = 'key_5'").collect()(0)
+ val embedding = vecRow.getSeq[Float](1)
+ assertEquals(16, embedding.size)
+ assertTrue(embedding.forall(_ == 5.0f))
+
+ // Read all columns including vector
+ val allDf = readHudiTable(basePath + "/projection")
+ .select("id", "embedding", "label", "score")
+ assertEquals(10, allDf.count())
+ val allRow = allDf.filter("id = 'key_3'").collect()(0)
+ assertEquals("label_3", allRow.getString(2))
+ assertEquals(30, allRow.getInt(3))
+ val allEmbedding = allRow.getSeq[Float](1)
+ assertEquals(16, allEmbedding.size)
+ assertTrue(allEmbedding.forall(_ == 3.0f))
+ }
+
+ @Test
+ def testDoubleVectorRoundTrip(): Unit = {
+ val metadata = vectorMetadata("VECTOR(64, DOUBLE)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(DoubleType, containsNull = false),
+ nullable = false, metadata),
+ StructField("label", StringType, nullable = true)
+ ))
+
+ val random = new scala.util.Random(123)
+ val data = (0 until 50).map { i =>
+ val embedding = Array.fill(64)(random.nextDouble())
+ Row(s"key_$i", embedding.toSeq, s"label_$i")
+ }
+
+ val df = createVectorDf(schema, data)
+
+ writeHudiTable(df, "double_vector_test", basePath + "/double_vec")
+
+ val readDf = readHudiTable(basePath + "/double_vec")
+ assertEquals(50, readDf.count())
+
+ // Verify schema: ArrayType(DoubleType)
+ val embField = readDf.schema("embedding")
+ val arrType = embField.dataType.asInstanceOf[ArrayType]
+ assertEquals(DoubleType, arrType.elementType)
+
+ // Verify metadata preserved with DOUBLE element type
+ val readMeta = embField.metadata
+ assertTrue(readMeta.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ val parsed = HoodieSchema.parseTypeDescriptor(
+ readMeta.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.VECTOR, parsed.getType)
+ val vecSchema = parsed.asInstanceOf[HoodieSchema.Vector]
+ assertEquals(64, vecSchema.getDimension)
+ assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE,
vecSchema.getVectorElementType)
+
+ // Verify actual values
+ val origMap = df.select("id", "embedding").collect()
+ .map(r => (r.getString(0), r.getSeq[Double](1))).toMap
+ val readMap = readDf.select("id", "embedding").collect()
+ .map(r => (r.getString(0), r.getSeq[Double](1))).toMap
+
+ origMap.foreach { case (id, orig) =>
+ val read = readMap(id)
+ assertEquals(64, read.size, s"Dimension mismatch for $id")
+ orig.zip(read).zipWithIndex.foreach { case ((o, r), idx) =>
+ assertEquals(o, r, 1e-15, s"Double mismatch at $id[$idx]")
+ }
+ }
+ }
+
+ @Test
+ def testInt8VectorRoundTrip(): Unit = {
+ val metadata = vectorMetadata("VECTOR(256, INT8)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(ByteType, containsNull = false),
+ nullable = false, metadata)
+ ))
+
+ val random = new scala.util.Random(99)
+ val data = (0 until 30).map { i =>
+ val embedding = Array.fill(256)((random.nextInt(256) - 128).toByte)
+ Row(s"key_$i", embedding.toSeq)
+ }
+
+ val df = createVectorDf(schema, data)
+
+ writeHudiTable(df, "int8_vector_test", basePath + "/int8_vec")
+
+ val readDf = readHudiTable(basePath + "/int8_vec")
+ assertEquals(30, readDf.count())
+
+ // Verify schema: ArrayType(ByteType)
+ val embField = readDf.schema("embedding")
+ val arrType = embField.dataType.asInstanceOf[ArrayType]
+ assertEquals(ByteType, arrType.elementType)
+
+ // Verify metadata
+ val readMeta = embField.metadata
+ val parsed = HoodieSchema.parseTypeDescriptor(
+ readMeta.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.VECTOR, parsed.getType)
+ val vecSchema = parsed.asInstanceOf[HoodieSchema.Vector]
+ assertEquals(256, vecSchema.getDimension)
+ assertEquals(HoodieSchema.Vector.VectorElementType.INT8,
vecSchema.getVectorElementType)
+
+ // Verify byte values
+ val origMap = df.select("id", "embedding").collect()
+ .map(r => (r.getString(0), r.getSeq[Byte](1))).toMap
+ val readMap = readDf.select("id", "embedding").collect()
+ .map(r => (r.getString(0), r.getSeq[Byte](1))).toMap
+
+ origMap.foreach { case (id, orig) =>
+ val read = readMap(id)
+ assertEquals(256, read.size, s"Dimension mismatch for $id")
+ assertArrayEquals(orig.toArray, read.toArray, s"INT8 vector mismatch for
$id")
+ }
+ }
+
+ @Test
+ def testMultipleVectorColumns(): Unit = {
+ val floatMeta = vectorMetadata("VECTOR(8)")
+ val doubleMeta = vectorMetadata("VECTOR(4, DOUBLE)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("vec_float", ArrayType(FloatType, containsNull = false),
+ nullable = false, floatMeta),
+ StructField("label", StringType, nullable = true),
+ StructField("vec_double", ArrayType(DoubleType, containsNull = false),
+ nullable = true, doubleMeta)
+ ))
+
+ val data = (0 until 20).map { i =>
+ Row(
+ s"key_$i",
+ Array.fill(8)(i.toFloat).toSeq,
+ s"label_$i",
+ if (i % 3 == 0) null else Array.fill(4)(i.toDouble).toSeq
+ )
+ }
+
+ val df = createVectorDf(schema, data)
+
+ writeHudiTable(df, "multi_vector_test", basePath + "/multi_vec")
+
+ val readDf = readHudiTable(basePath + "/multi_vec")
+ assertEquals(20, readDf.count())
+
+ // Verify both vector columns present with correct types
+ val floatField = readDf.schema("vec_float")
+ assertEquals(FloatType,
floatField.dataType.asInstanceOf[ArrayType].elementType)
+ val doubleField = readDf.schema("vec_double")
+ assertEquals(DoubleType,
doubleField.dataType.asInstanceOf[ArrayType].elementType)
+
+ // Verify data: row with both vectors
+ val row5 = readDf.select("id", "vec_float", "vec_double")
+ .filter("id = 'key_5'").collect()(0)
+ val fVec = row5.getSeq[Float](1)
+ assertEquals(8, fVec.size)
+ assertTrue(fVec.forall(_ == 5.0f))
+ val dVec = row5.getSeq[Double](2)
+ assertEquals(4, dVec.size)
+ assertTrue(dVec.forall(_ == 5.0))
+
+ // Verify data: row with null double vector (i=0, i%3==0)
+ val row0 = readDf.select("id", "vec_float", "vec_double")
+ .filter("id = 'key_0'").collect()(0)
+ assertFalse(row0.isNullAt(1))
+ assertTrue(row0.isNullAt(2), "Expected null double vector for key_0")
+
+ // Verify projection: select only one vector column
+ val floatOnlyDf = readDf.select("id", "vec_float")
+ assertEquals(20, floatOnlyDf.count())
+ val doubleOnlyDf = readDf.select("id", "vec_double")
+ assertEquals(20, doubleOnlyDf.count())
+ }
+
+ @Test
+ def testMorTableWithVectors(): Unit = {
+ val metadata = vectorMetadata("VECTOR(16)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata),
+ StructField("ts", LongType, nullable = false)
+ ))
+
+ // Initial insert
+ val data1 = (0 until 20).map { i =>
+ Row(s"key_$i", Array.fill(16)(1.0f).toSeq, i.toLong)
+ }
+
+ val df1 = createVectorDf(schema, data1)
+
+ writeHudiTable(df1, "mor_vector_test", basePath + "/mor_vec",
+ tableType = "MERGE_ON_READ", precombineField = "ts")
+
+ // Upsert: update some vectors with new values
+ val data2 = (0 until 10).map { i =>
+ Row(s"key_$i", Array.fill(16)(2.0f).toSeq, 100L + i)
+ }
+
+ val df2 = createVectorDf(schema, data2)
+
+ writeHudiTable(df2, "mor_vector_test", basePath + "/mor_vec",
+ tableType = "MERGE_ON_READ", precombineField = "ts", mode =
SaveMode.Append)
+
+ // Read the merged view
+ val readDf = readHudiTable(basePath + "/mor_vec")
+ assertEquals(20, readDf.count())
+
+ // Updated rows (key_0 through key_9) should have new vectors
+ val updatedRow = readDf.select("id", "embedding")
+ .filter("id = 'key_5'").collect()(0)
+ val updatedVec = updatedRow.getSeq[Float](1)
+ assertEquals(16, updatedVec.size)
+ assertTrue(updatedVec.forall(_ == 2.0f), "Updated vector should have value
2.0")
+
+ // Non-updated rows (key_10 through key_19) should keep original vectors
+ val origRow = readDf.select("id", "embedding")
+ .filter("id = 'key_15'").collect()(0)
+ val origVec = origRow.getSeq[Float](1)
+ assertEquals(16, origVec.size)
+ assertTrue(origVec.forall(_ == 1.0f), "Non-updated vector should have
value 1.0")
+ }
+
+ @Test
+ def testCowUpsertWithVectors(): Unit = {
+ val metadata = vectorMetadata("VECTOR(8)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata),
+ StructField("ts", LongType, nullable = false),
+ StructField("name", StringType, nullable = true)
+ ))
+
+ // Initial write
+ val data1 = (0 until 10).map { i =>
+ Row(s"key_$i", Array.fill(8)(0.0f).toSeq, i.toLong, s"name_$i")
+ }
+
+ writeHudiTable(createVectorDf(schema, data1), "cow_upsert_vec_test",
+ basePath + "/cow_upsert", precombineField = "ts")
+
+ // Upsert: update vectors for existing keys + add new keys
+ val data2 = Seq(
+ Row("key_0", Array.fill(8)(9.9f).toSeq, 100L, "updated_0"),
+ Row("key_5", Array.fill(8)(5.5f).toSeq, 100L, "updated_5"),
+ Row("key_10", Array.fill(8)(10.0f).toSeq, 100L, "new_10")
+ )
+
+ writeHudiTable(createVectorDf(schema, data2), "cow_upsert_vec_test",
+ basePath + "/cow_upsert", precombineField = "ts", mode = SaveMode.Append)
+
+ val readDf = readHudiTable(basePath + "/cow_upsert")
+ assertEquals(11, readDf.count())
+
+ // Verify updated key_0
+ val r0 = readDf.select("id", "embedding", "name")
+ .filter("id = 'key_0'").collect()(0)
+ assertTrue(r0.getSeq[Float](1).forall(_ == 9.9f))
+ assertEquals("updated_0", r0.getString(2))
+
+ // Verify non-updated key_3
+ val r3 = readDf.select("id", "embedding", "name")
+ .filter("id = 'key_3'").collect()(0)
+ assertTrue(r3.getSeq[Float](1).forall(_ == 0.0f))
+ assertEquals("name_3", r3.getString(2))
+
+ // Verify new key_10
+ val r10 = readDf.select("id", "embedding", "name")
+ .filter("id = 'key_10'").collect()(0)
+ assertTrue(r10.getSeq[Float](1).forall(_ == 10.0f))
+ assertEquals("new_10", r10.getString(2))
+ }
+
+ @Test
+ def testLargeDimensionVector(): Unit = {
+ val metadata = vectorMetadata("VECTOR(1536)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata)
+ ))
+
+ val random = new scala.util.Random(7)
+ val data = (0 until 5).map { i =>
+ Row(s"key_$i", Array.fill(1536)(random.nextFloat()).toSeq)
+ }
+
+ val df = createVectorDf(schema, data)
+
+ writeHudiTable(df, "large_dim_vec_test", basePath + "/large_dim")
+
+ val readDf = readHudiTable(basePath + "/large_dim")
+ assertEquals(5, readDf.count())
+
+ // Verify dimension preserved
+ val readMeta = readDf.schema("embedding").metadata
+ val vecSchema = HoodieSchema.parseTypeDescriptor(
+
readMeta.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+ assertEquals(1536, vecSchema.getDimension)
+
+ // Verify values
+ val origMap = df.select("id", "embedding").collect()
+ .map(r => (r.getString(0), r.getSeq[Float](1))).toMap
+ val readMap = readDf.select("id", "embedding").collect()
+ .map(r => (r.getString(0), r.getSeq[Float](1))).toMap
+
+ origMap.foreach { case (id, orig) =>
+ val read = readMap(id)
+ assertEquals(1536, read.size)
+ orig.zip(read).foreach { case (o, r) =>
+ assertEquals(o, r, 1e-9f, s"Mismatch in $id")
+ }
+ }
+ }
+
+ @Test
+ def testSmallDimensionVector(): Unit = {
+ val metadata = vectorMetadata("VECTOR(2)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("coords", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata)
+ ))
+
+ val data = Seq(
+ Row("a", Seq(1.0f, 2.0f)),
+ Row("b", Seq(-1.5f, 3.14f)),
+ Row("c", Seq(0.0f, Float.MaxValue))
+ )
+
+ val df = createVectorDf(schema, data)
+
+ writeHudiTable(df, "small_dim_test", basePath + "/small_dim")
+
+ val readDf = readHudiTable(basePath + "/small_dim")
+ assertEquals(3, readDf.count())
+
+ val rowA = readDf.select("id", "coords").filter("id = 'a'").collect()(0)
+ val coordsA = rowA.getSeq[Float](1)
+ assertEquals(2, coordsA.size)
+ assertEquals(1.0f, coordsA(0), 1e-9f)
+ assertEquals(2.0f, coordsA(1), 1e-9f)
+
+ val rowC = readDf.select("id", "coords").filter("id = 'c'").collect()(0)
+ val coordsC = rowC.getSeq[Float](1)
+ assertEquals(Float.MaxValue, coordsC(1), 1e-30f)
+ }
+
+ @Test
+ def testVectorWithNonVectorArrayColumn(): Unit = {
+ val vectorMeta = vectorMetadata("VECTOR(4)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, vectorMeta),
+ StructField("tags", ArrayType(StringType, containsNull = true),
+ nullable = true)
+ ))
+
+ val data = Seq(
+ Row("k1", Seq(1.0f, 2.0f, 3.0f, 4.0f), Seq("tag1", "tag2")),
+ Row("k2", Seq(5.0f, 6.0f, 7.0f, 8.0f), null),
+ Row("k3", Seq(0.1f, 0.2f, 0.3f, 0.4f), Seq("tag3"))
+ )
+
+ val df = createVectorDf(schema, data)
+
+ writeHudiTable(df, "mixed_array_test", basePath + "/mixed_array")
+
+ val readDf = readHudiTable(basePath + "/mixed_array")
+ assertEquals(3, readDf.count())
+
+ // Vector column should be ArrayType(FloatType) with vector metadata
+ val embField = readDf.schema("embedding")
+ assertTrue(embField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(FloatType,
embField.dataType.asInstanceOf[ArrayType].elementType)
+
+ // Non-vector array column should be ArrayType(StringType) without vector
metadata
+ val tagsField = readDf.schema("tags")
+ assertFalse(tagsField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(StringType,
tagsField.dataType.asInstanceOf[ArrayType].elementType)
+
+ // Verify vector data preserved
+ val row1 = readDf.select("id", "embedding", "tags")
+ .filter("id = 'k1'").collect()(0)
+ val emb = row1.getSeq[Float](1)
+ assertEquals(Seq(1.0f, 2.0f, 3.0f, 4.0f), emb)
+ assertEquals(Seq("tag1", "tag2"), row1.getSeq[String](2))
+
+ // Verify null tags preserved
+ val row2 = readDf.select("id", "embedding", "tags")
+ .filter("id = 'k2'").collect()(0)
+ assertFalse(row2.isNullAt(1))
+ assertTrue(row2.isNullAt(2))
+ }
+
+ @Test
+ def testMorWithMultipleUpserts(): Unit = {
+ val metadata = vectorMetadata("VECTOR(4, DOUBLE)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(DoubleType, containsNull = false),
+ nullable = false, metadata),
+ StructField("ts", LongType, nullable = false)
+ ))
+
+ // Insert batch 1
+ val batch1 = (0 until 10).map { i =>
+ Row(s"key_$i", Array.fill(4)(1.0).toSeq, 1L)
+ }
+ writeHudiTable(createVectorDf(schema, batch1), "mor_multi_upsert_test",
+ basePath + "/mor_multi", tableType = "MERGE_ON_READ", precombineField =
"ts")
+
+ // Upsert batch 2: update key_0..key_4
+ val batch2 = (0 until 5).map { i =>
+ Row(s"key_$i", Array.fill(4)(2.0).toSeq, 2L)
+ }
+ writeHudiTable(createVectorDf(schema, batch2), "mor_multi_upsert_test",
+ basePath + "/mor_multi", tableType = "MERGE_ON_READ", precombineField =
"ts",
+ mode = SaveMode.Append)
+
+ // Upsert batch 3: update key_0..key_2 again
+ val batch3 = (0 until 3).map { i =>
+ Row(s"key_$i", Array.fill(4)(3.0).toSeq, 3L)
+ }
+ writeHudiTable(createVectorDf(schema, batch3), "mor_multi_upsert_test",
+ basePath + "/mor_multi", tableType = "MERGE_ON_READ", precombineField =
"ts",
+ mode = SaveMode.Append)
+
+ val readDf = readHudiTable(basePath + "/mor_multi")
+ assertEquals(10, readDf.count())
+
+ // key_0: updated 3 times → should have value 3.0
+ val r0 = readDf.select("id", "embedding").filter("id =
'key_0'").collect()(0)
+ assertTrue(r0.getSeq[Double](1).forall(_ == 3.0), "key_0 should have
latest value 3.0")
+
+ // key_3: updated once (batch 2) → should have value 2.0
+ val r3 = readDf.select("id", "embedding").filter("id =
'key_3'").collect()(0)
+ assertTrue(r3.getSeq[Double](1).forall(_ == 2.0), "key_3 should have value
2.0")
+
+ // key_7: never updated → should have value 1.0
+ val r7 = readDf.select("id", "embedding").filter("id =
'key_7'").collect()(0)
+ assertTrue(r7.getSeq[Double](1).forall(_ == 1.0), "key_7 should have
original value 1.0")
+ }
+
+ @Test
+ def testDimensionMismatchOnWrite(): Unit = {
+ // Schema declares VECTOR(8) but data has arrays of length 4
+ val metadata = vectorMetadata("VECTOR(8)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata)
+ ))
+
+ val data = Seq(
+ Row("key_1", Seq(1.0f, 2.0f, 3.0f, 4.0f)) // only 4 elements, schema
says 8
+ )
+
+ val df = createVectorDf(schema, data)
+
+ val ex = assertThrows(classOf[Exception], () => {
+ writeHudiTable(df, "dim_mismatch_test", basePath + "/dim_mismatch")
+ })
+ // The root cause should mention dimension mismatch
+ var cause: Throwable = ex
+ var foundMismatch = false
+ while (cause != null && !foundMismatch) {
+ if (cause.getMessage != null && cause.getMessage.contains("dimension
mismatch")) {
+ foundMismatch = true
+ }
+ cause = cause.getCause
+ }
+ assertTrue(foundMismatch,
+ s"Expected 'dimension mismatch' in exception chain, got:
${ex.getMessage}")
+ }
+
+ @Test
+ def testSchemaEvolutionRejectsDimensionChange(): Unit = {
+ // Write initial table with VECTOR(4)
+ val metadata4 = vectorMetadata("VECTOR(4)")
+
+ val schema4 = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata4),
+ StructField("ts", LongType, nullable = false)
+ ))
+
+ val data1 = Seq(Row("key_1", Seq(1.0f, 2.0f, 3.0f, 4.0f), 1L))
+ writeHudiTable(createVectorDf(schema4, data1), "schema_evolve_dim_test",
+ basePath + "/schema_evolve_dim", precombineField = "ts")
+
+ // Now try to write with VECTOR(8) — different dimension should be rejected
+ val metadata8 = vectorMetadata("VECTOR(8)")
+
+ val schema8 = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata8),
+ StructField("ts", LongType, nullable = false)
+ ))
+
+ val data2 = Seq(Row("key_2", Seq(1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f,
8.0f), 2L))
+
+ assertThrows(classOf[Exception], () => {
+ writeHudiTable(createVectorDf(schema8, data2), "schema_evolve_dim_test",
+ basePath + "/schema_evolve_dim", precombineField = "ts", mode =
SaveMode.Append)
+ })
+ }
+
+ /**
+ * Verifies that vector column metadata is written to the Parquet file footer
+ * under the key hoodie.vector.columns.
+ */
+ @Test
+ def testParquetFooterContainsVectorMetadata(): Unit = {
+ val metadata = vectorMetadata("VECTOR(8)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata)
+ ))
+
+ val data = Seq(Row("key_1", Array.fill(8)(1.0f).toSeq))
+ writeHudiTable(createVectorDf(schema, data), "footer_meta_test", basePath
+ "/footer_meta")
+
+ // Find a .parquet base file and read its footer metadata
+ val conf = spark.sessionState.newHadoopConf()
+ val fs = new Path(basePath + "/footer_meta").getFileSystem(conf)
+ val parquetFiles = fs.listStatus(new Path(basePath + "/footer_meta"))
+ .flatMap(d => Option(fs.listStatus(d.getPath)).getOrElse(Array.empty))
+ .filter(f => f.getPath.getName.endsWith(".parquet") &&
!f.getPath.getName.startsWith("."))
+
+ assertTrue(parquetFiles.nonEmpty, "Expected at least one parquet file")
+
+ val reader =
ParquetFileReader.open(HadoopInputFile.fromPath(parquetFiles.head.getPath,
conf))
+ try {
+ val footerMeta = reader.getFileMetaData.getKeyValueMetaData.asScala
+
assertTrue(footerMeta.contains(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY),
+ s"Footer should contain
${HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY}, got keys:
${footerMeta.keys.mkString(", ")}")
+
+ val value = footerMeta(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY)
+ assertTrue(value.contains("embedding"), s"Footer value should reference
'embedding' column, got: $value")
+ assertTrue(value.contains("VECTOR"), s"Footer value should contain
'VECTOR' descriptor, got: $value")
+ } finally {
+ reader.close()
+ }
+ }
+
+ @Test
+ def testPartitionedTableWithVector(): Unit = {
+ val metadata = vectorMetadata("VECTOR(4)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata),
+ StructField("label", StringType, nullable = true),
+ StructField("category", StringType, nullable = false)
+ ))
+
+ // Two partitions: "catA" and "catB"
+ val data = (0 until 10).map { i =>
+ val category = if (i % 2 == 0) "catA" else "catB"
+ Row(s"key_$i", Array.fill(4)(i.toFloat).toSeq, s"label_$i", category)
+ }
+
+ writeHudiTable(createVectorDf(schema, data), "partitioned_vector_test",
+ basePath + "/partitioned",
+ extraOpts = Map("hoodie.datasource.write.partitionpath.field" ->
"category"))
+
+ val readDf = readHudiTable(basePath + "/partitioned")
+ assertEquals(10, readDf.count())
+
+ // Collect all rows and verify each row's vector matches its key
+ val rowMap = readDf.select("id", "embedding", "category").collect()
+ .map(r => r.getString(0) -> (r.getSeq[Float](1), r.getString(2)))
+ .toMap
+
+ for (i <- 0 until 10) {
+ val (vec, cat) = rowMap(s"key_$i")
+ val expectedCat = if (i % 2 == 0) "catA" else "catB"
+ assertEquals(4, vec.size, s"key_$i dimension wrong")
+ assertTrue(vec.forall(_ == i.toFloat),
+ s"key_$i: expected ${i.toFloat} but got ${vec.head} (ordinal
mismatch?)")
+ assertEquals(expectedCat, cat, s"key_$i partition value wrong")
+ }
+
+ // Also verify projection of vector-only across partitions
+ val vecOnly = readDf.select("id", "embedding").collect()
+ .map(r => r.getString(0) -> r.getSeq[Float](1)).toMap
+ for (i <- 0 until 10) {
+ assertTrue(vecOnly(s"key_$i").forall(_ == i.toFloat),
+ s"key_$i projected vector wrong")
+ }
+ }
+
+ @Test
+ def testVectorAsLastColumn(): Unit = {
+ val metadata = vectorMetadata("VECTOR(4)")
+
+ // Vector is at position 4 (last), after several non-vector columns
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("col_a", IntegerType, nullable = true),
+ StructField("col_b", StringType, nullable = true),
+ StructField("col_c", DoubleType, nullable = true),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata)
+ ))
+
+ val data = (0 until 10).map { i =>
+ Row(s"key_$i", i, s"str_$i", i.toDouble * 1.5,
Array.fill(4)(i.toFloat).toSeq)
+ }
+
+ writeHudiTable(createVectorDf(schema, data), "last_col_vector_test",
basePath + "/last_col")
+
+ val readDf = readHudiTable(basePath + "/last_col")
+ assertEquals(10, readDf.count())
+
+ // Read all columns: verify vector and non-vector columns correct
+ val allRows = readDf.select("id", "col_a", "col_b", "embedding").collect()
+ .map(r => r.getString(0) -> r).toMap
+
+ for (i <- 0 until 10) {
+ val row = allRows(s"key_$i")
+ assertEquals(i, row.getInt(1), s"col_a wrong for key_$i")
+ assertEquals(s"str_$i", row.getString(2), s"col_b wrong for key_$i")
+ val vec = row.getSeq[Float](3)
+ assertEquals(4, vec.size, s"key_$i dimension wrong")
+ assertTrue(vec.forall(_ == i.toFloat),
+ s"key_$i vector wrong (ordinal mismatch?): expected ${i.toFloat}, got
${vec.head}")
+ }
+
+ // Project only the vector column (ordinal shifts to 0 in projected schema)
+ val embOnly = readDf.select("id", "embedding").collect()
+ .map(r => r.getString(0) -> r.getSeq[Float](1)).toMap
+ for (i <- 0 until 10) {
+ assertTrue(embOnly(s"key_$i").forall(_ == i.toFloat),
+ s"key_$i projected-only vector wrong")
+ }
+ }
+
+ /**
+ * Schema evolution: adding a new non-vector column to a table that already
has a vector column
+ * should succeed. Old rows get null for the new column; vector data must be
intact in all rows.
+ */
+ @Test
+ def testSchemaEvolutionAddColumnToVectorTable(): Unit = {
+ val metadata = vectorMetadata("VECTOR(4)")
+
+ val schemaV1 = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata),
+ StructField("ts", LongType, nullable = false)
+ ))
+
+ val data1 = (0 until 5).map { i =>
+ Row(s"key_$i", Array.fill(4)(i.toFloat).toSeq, i.toLong)
+ }
+ writeHudiTable(createVectorDf(schemaV1, data1),
"schema_evolve_add_col_test",
+ basePath + "/schema_evolve_add", precombineField = "ts",
+ extraOpts = Map("hoodie.schema.on.read.enable" -> "true"))
+
+ // V2: add a new non-vector column
+ val schemaV2 = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata),
+ StructField("ts", LongType, nullable = false),
+ StructField("new_col", StringType, nullable = true)
+ ))
+
+ val data2 = (5 until 10).map { i =>
+ Row(s"key_$i", Array.fill(4)(i.toFloat).toSeq, i.toLong, s"v2_$i")
+ }
+ writeHudiTable(createVectorDf(schemaV2, data2),
"schema_evolve_add_col_test",
+ basePath + "/schema_evolve_add", precombineField = "ts", mode =
SaveMode.Append,
+ extraOpts = Map("hoodie.schema.on.read.enable" -> "true"))
+
+ val readDf = spark.read.format("hudi")
+ .option("hoodie.schema.on.read.enable", "true")
+ .load(basePath + "/schema_evolve_add")
+ assertEquals(10, readDf.count())
+
+ val rowMap = readDf.select("id", "embedding", "new_col").collect()
+ .map(r => r.getString(0) -> r).toMap
+
+ // Old rows (key_0..key_4): vector intact, new_col is null
+ for (i <- 0 until 5) {
+ val row = rowMap(s"key_$i")
+ val vec = row.getSeq[Float](1)
+ assertEquals(4, vec.size)
+ assertTrue(vec.forall(_ == i.toFloat), s"key_$i vector corrupted after
schema evolution")
+ assertTrue(row.isNullAt(2), s"key_$i new_col should be null")
+ }
+ // New rows (key_5..key_9): vector intact, new_col has value
+ for (i <- 5 until 10) {
+ val row = rowMap(s"key_$i")
+ val vec = row.getSeq[Float](1)
+ assertEquals(4, vec.size)
+ assertTrue(vec.forall(_ == i.toFloat), s"key_$i vector corrupted after
schema evolution")
+ assertEquals(s"v2_$i", row.getString(2), s"key_$i new_col wrong")
+ }
+ }
+
+ /**
+ * Deleting records from a table with a vector column should not affect
remaining records.
+ */
+ @Test
+ def testDeleteFromVectorTable(): Unit = {
+ val metadata = vectorMetadata("VECTOR(4)")
+
+ val schema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("embedding", ArrayType(FloatType, containsNull = false),
+ nullable = false, metadata),
+ StructField("ts", LongType, nullable = false)
+ ))
+
+ val data = (0 until 10).map { i =>
+ Row(s"key_$i", Array.fill(4)(i.toFloat).toSeq, i.toLong)
+ }
+ writeHudiTable(createVectorDf(schema, data), "delete_vector_test",
+ basePath + "/delete_vec", precombineField = "ts")
+
+ // Delete key_2, key_5, key_8
+ val deletedKeys = Set("key_2", "key_5", "key_8")
+ val deleteSchema = StructType(Seq(
+ StructField("id", StringType, nullable = false),
+ StructField("ts", LongType, nullable = false)
+ ))
+ val deleteData = deletedKeys.toSeq.map(k => Row(k, 999L))
+ writeHudiTable(createVectorDf(deleteSchema, deleteData),
"delete_vector_test",
+ basePath + "/delete_vec", precombineField = "ts", mode = SaveMode.Append,
+ extraOpts = Map(OPERATION.key -> "delete"))
+
+ val readDf = readHudiTable(basePath + "/delete_vec")
+ assertEquals(7, readDf.count(), "Deleted rows should be gone")
+
+ val rowMap = readDf.select("id", "embedding").collect()
+ .map(r => r.getString(0) -> r.getSeq[Float](1)).toMap
+
+ // Deleted keys must not appear
+ deletedKeys.foreach { k =>
+ assertFalse(rowMap.contains(k), s"$k should have been deleted")
+ }
+
+ // Remaining keys must have correct vectors
+ val remaining = (0 until 10).map(i =>
s"key_$i").filterNot(deletedKeys.contains)
+ remaining.foreach { k =>
+ val i = k.stripPrefix("key_").toInt
+ val vec = rowMap(k)
+ assertEquals(4, vec.size, s"$k dimension wrong")
+ assertTrue(vec.forall(_ == i.toFloat), s"$k vector wrong after delete")
+ }
+ }
+
+ private def assertArrayEquals(expected: Array[Byte], actual: Array[Byte],
message: String): Unit = {
+ assertEquals(expected.length, actual.length, s"$message: length mismatch")
+ expected.zip(actual).zipWithIndex.foreach { case ((e, a), idx) =>
+ assertEquals(e, a, s"$message: mismatch at index $idx")
+ }
+ }
+}