This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fc1deb68b0f feat(vector): Support writing VECTOR to parquet and avro 
formats using Spark (#18328)
3fc1deb68b0f is described below

commit 3fc1deb68b0fe7cb5291aa3231deee69f003648c
Author: Rahil C <[email protected]>
AuthorDate: Fri Mar 27 05:49:47 2026 -0400

    feat(vector): Support writing VECTOR to parquet and avro formats using 
Spark (#18328)
    
    * futher changes for getting parquet to write hudi vectors
    
    * trying to resolve type issues during read/write
    
    * finally able to write from spark to hudi to parquet vectors and then read 
them back
    
    * get things working
    
    * fix vector write path to support all element types, add column projection 
test
    
    - Write path (HoodieRowParquetWriteSupport.makeWriter) now switches on
      VectorElementType (FLOAT/DOUBLE/INT8) instead of hardcoding float,
      matching the read paths
    - Remove redundant detectVectorColumns call in readBaseFile by reusing
      vectorCols from requiredSchema for requestedSchema
    - Add testColumnProjectionWithVector covering 3 scenarios: exclude vector,
      vector-only, and all columns
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
    
    * Address PR review comments for vector Parquet read/write support
    
    - Use VectorLogicalType.VECTOR_BYTE_ORDER instead of hardcoded
      ByteOrder.LITTLE_ENDIAN in all 4 locations (write support, reader,
      Scala reader context, file group format)
    - Add Math.multiplyExact overflow guard for dimension * elementSize
      in HoodieRowParquetWriteSupport
    - Remove unnecessary array clone in HoodieSparkParquetReader
    - Add clarifying comment on non-vector column else branch
    - Fix misleading "float arrays" comment to "typed arrays"
    - Move inline JavaConverters import to top-level in
      SparkFileFormatInternalRowReaderContext
    - Import Metadata at top level instead of fully-qualified reference
    - Consolidate duplicate detectVectorColumns, replaceVectorColumnsWithBinary,
      and convertBinaryToVectorArray into 
SparkFileFormatInternalRowReaderContext
      companion object; HoodieFileGroupReaderBasedFileFormat now delegates
    - Add Javadoc on VectorType explaining it's needed for InternalSchema
      type hierarchy (cannot reuse HoodieSchema.Vector)
    - Clean up unused imports (ByteOrder, ByteBuffer, GenericArrayData,
      StructField, BinaryType, HoodieSchemaType)
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
    
    * Add comprehensive vector test coverage for all element types and table 
types
    
    New tests added to TestVectorDataSource:
    
    - testDoubleVectorRoundTrip: DOUBLE element type end-to-end (64-dim)
    - testInt8VectorRoundTrip: INT8/byte element type end-to-end (256-dim)
    - testMultipleVectorColumns: two vector columns (float + double) in
      same schema with selective nulls and per-column projection
    - testMorTableWithVectors: MOR table type with insert + upsert,
      verifying merged view returns correct vectors
    - testCowUpsertWithVectors: COW upsert (update existing + insert new)
      verifying vector values after merge
    - testLargeDimensionVector: 1536-dim float vectors (OpenAI embedding
      size) to exercise large buffer allocation
    - testSmallDimensionVector: 2-dim vectors with edge values
      (Float.MaxValue) to verify boundary handling
    - testVectorWithNonVectorArrayColumn: vector column alongside a
      regular ArrayType(StringType) to ensure non-vector arrays are
      not incorrectly treated as vectors
    - testMorWithMultipleUpserts: MOR with 3 successive upsert batches
      of DOUBLE vectors, verifying the latest value wins per key
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
    
    * Handle vector schema checks and skip vector column stats
    
    * Extract VectorConversionUtils, add write-path dimension validation, fix 
hot-path allocation
    
    - Create shared VectorConversionUtils utility class to eliminate duplicated
      vector conversion logic across HoodieSparkParquetReader,
      SparkFileFormatInternalRowReaderContext, and 
HoodieFileGroupReaderBasedFileFormat
    - Add explicit dimension validation in HoodieRowParquetWriteSupport to 
prevent
      silent data corruption when array length doesn't match declared vector 
dimension
    - Reuse GenericInternalRow in HoodieSparkParquetReader's vector 
post-processing
      loop to reduce GC pressure on large scans
    
    * fix(vector): replace existential type Map[Int, _] with Map[Int, 
HoodieSchema.Vector] to fix Scala 2.12 type inference error
    
    * minor comilation issue fix
    
    * further self review fixes
    
    * fix(vector): fix Scala import ordering style violations
    
    - Move VectorConversionUtils import into hudi group (was misplaced in 
3rdParty)
    - Add blank line between hudi and 3rdParty import groups
    - Add blank line between java and scala import groups
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
    
    * address balaji comments
    
    * address balaji comment around putting hoodie vector metdata in parquet 
footer
    
    * address all intial core comments
    
    * address perf comments
    
    * remove comment
    
    * get spark 3.3 working
    
    * address voon comment
    
    * reduce test code duplication
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../hudi/io/storage/HoodieSparkParquetReader.java  |   26 +-
 .../hudi/io/storage/VectorConversionUtils.java     |  238 +++++
 .../storage/row/HoodieRowParquetWriteSupport.java  |   50 +
 .../SparkFileFormatInternalRowReaderContext.scala  |   89 +-
 .../parquet/HoodieParquetFileFormatHelper.scala    |   35 +-
 .../apache/hudi/common/schema/HoodieSchema.java    |   95 +-
 .../HoodieSchemaComparatorForSchemaEvolution.java  |   10 +
 .../schema/HoodieSchemaCompatibilityChecker.java   |   22 +
 .../java/org/apache/hudi/internal/schema/Type.java |    7 +-
 .../org/apache/hudi/internal/schema/Types.java     |   70 ++
 .../schema/convert/InternalSchemaConverter.java    |   14 +
 .../hudi/metadata/HoodieTableMetadataUtil.java     |    7 +-
 .../hudi/common/schema/TestHoodieSchema.java       |   24 +-
 ...stHoodieSchemaComparatorForSchemaEvolution.java |   18 +
 .../schema/TestHoodieSchemaCompatibility.java      |   38 +
 .../convert/TestInternalSchemaConverter.java       |   58 ++
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |    9 +
 .../apache/hudi/avro/HoodieAvroWriteSupport.java   |    4 +
 .../avro/AvroSchemaConverterWithTimestampNTZ.java  |   12 +
 .../HoodieFileGroupReaderBasedFileFormat.scala     |  134 ++-
 .../hudi/functional/TestVectorDataSource.scala     | 1000 ++++++++++++++++++++
 21 files changed, 1909 insertions(+), 51 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 160a731cece4..80ed9be8420e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -64,7 +64,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 import scala.Option$;
 
@@ -142,10 +144,20 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
   public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema 
requestedSchema, List<Filter> readFilters) throws IOException {
     HoodieSchema nonNullSchema = requestedSchema.getNonNullType();
     StructType structSchema = 
HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
+
+    // Detect vector columns: ordinal → Vector schema
+    Map<Integer, HoodieSchema.Vector> vectorColumnInfo = 
VectorConversionUtils.detectVectorColumns(nonNullSchema);
+
+    // For vector columns, replace ArrayType(FloatType) with BinaryType in the 
read schema
+    // so SparkBasicSchemaEvolution sees matching types (file has 
FIXED_LEN_BYTE_ARRAY → BinaryType)
+    StructType readStructSchema = vectorColumnInfo.isEmpty()
+        ? structSchema
+        : VectorConversionUtils.replaceVectorColumnsWithBinary(structSchema, 
vectorColumnInfo);
+
     Option<MessageType> messageSchema = 
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
     boolean enableTimestampFieldRepair = 
storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true);
     StructType dataStructType = convertToStruct(enableTimestampFieldRepair ? 
SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) : 
getFileSchema());
-    SparkBasicSchemaEvolution evolution = new 
SparkBasicSchemaEvolution(dataStructType, structSchema, 
SQLConf.get().sessionLocalTimeZone());
+    SparkBasicSchemaEvolution evolution = new 
SparkBasicSchemaEvolution(dataStructType, readStructSchema, 
SQLConf.get().sessionLocalTimeZone());
     String readSchemaJson = evolution.getRequestSchema().json();
     SQLConf sqlConf = SQLConf.get();
     storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, 
readSchemaJson);
@@ -184,6 +196,18 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
     UnsafeProjection projection = evolution.generateUnsafeProjection();
     ParquetReaderIterator<InternalRow> parquetReaderIterator = new 
ParquetReaderIterator<>(reader);
     CloseableMappingIterator<InternalRow, UnsafeRow> projectedIterator = new 
CloseableMappingIterator<>(parquetReaderIterator, projection::apply);
+
+    if (!vectorColumnInfo.isEmpty()) {
+      // Post-process: convert binary VECTOR columns back to typed arrays
+      UnsafeProjection vectorProjection = 
UnsafeProjection.create(structSchema);
+      Function<InternalRow, InternalRow> mapper =
+          VectorConversionUtils.buildRowMapper(readStructSchema, 
vectorColumnInfo, vectorProjection::apply);
+      CloseableMappingIterator<UnsafeRow, UnsafeRow> vectorIterator =
+          new CloseableMappingIterator<>(projectedIterator, row -> (UnsafeRow) 
mapper.apply(row));
+      readerIterators.add(vectorIterator);
+      return vectorIterator;
+    }
+
     readerIterators.add(projectedIterator);
     return projectedIterator;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java
new file mode 100644
index 000000000000..2bf8e86b5d0f
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+/**
+ * Shared utility methods for vector column handling during Parquet read/write.
+ *
+ * Vectors are stored as Parquet FIXED_LEN_BYTE_ARRAY columns. On read, Spark 
maps these
+ * to BinaryType. This class provides the canonical conversion between the 
binary
+ * representation and Spark's typed ArrayData (float[], double[], byte[]).
+ *
+ */
+public final class VectorConversionUtils {
+
+  private VectorConversionUtils() {
+  }
+
+  /**
+   * Detects VECTOR columns in a HoodieSchema record and returns a map of 
field ordinal
+   * to the corresponding {@link HoodieSchema.Vector} schema.
+   *
+   * @param schema a HoodieSchema of type RECORD (or null)
+   * @return map from field index to Vector schema; empty map if schema is 
null or has no vectors
+   */
+  public static Map<Integer, HoodieSchema.Vector> 
detectVectorColumns(HoodieSchema schema) {
+    Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
+    if (schema == null) {
+      return vectorColumnInfo;
+    }
+    List<HoodieSchemaField> fields = schema.getFields();
+    for (int i = 0; i < fields.size(); i++) {
+      HoodieSchema fieldSchema = fields.get(i).schema().getNonNullType();
+      if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+        vectorColumnInfo.put(i, (HoodieSchema.Vector) fieldSchema);
+      }
+    }
+    return vectorColumnInfo;
+  }
+
+  /**
+   * Detects VECTOR columns from Spark StructType metadata annotations.
+   * Fields with metadata key {@link HoodieSchema#TYPE_METADATA_FIELD} 
starting with "VECTOR"
+   * are parsed and included.
+   *
+   * @param schema Spark StructType
+   * @return map from field index to Vector schema; empty map if no vectors 
found
+   */
+  public static Map<Integer, HoodieSchema.Vector> 
detectVectorColumnsFromMetadata(StructType schema) {
+    Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
+    if (schema == null) {
+      return vectorColumnInfo;
+    }
+    StructField[] fields = schema.fields();
+    for (int i = 0; i < fields.length; i++) {
+      StructField field = fields[i];
+      if (field.metadata().contains(HoodieSchema.TYPE_METADATA_FIELD)) {
+        String typeStr = 
field.metadata().getString(HoodieSchema.TYPE_METADATA_FIELD);
+        if (typeStr.startsWith("VECTOR")) {
+          HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeStr);
+          if (parsed.getType() == HoodieSchemaType.VECTOR) {
+            vectorColumnInfo.put(i, (HoodieSchema.Vector) parsed);
+          }
+        }
+      }
+    }
+    return vectorColumnInfo;
+  }
+
+  /**
+   * Replaces ArrayType with BinaryType for VECTOR columns so the Parquet 
reader
+   * can read FIXED_LEN_BYTE_ARRAY data without type mismatch.
+   *
+   * @param structType     the original Spark schema
+   * @param vectorColumns  map of ordinal to vector info (only the key set is 
used)
+   * @return a new StructType with vector columns replaced by BinaryType
+   */
+  public static StructType replaceVectorColumnsWithBinary(StructType 
structType, Map<Integer, ?> vectorColumns) {
+    StructField[] fields = structType.fields();
+    StructField[] newFields = new StructField[fields.length];
+    for (int i = 0; i < fields.length; i++) {
+      if (vectorColumns.containsKey(i)) {
+        // Preserve the original field metadata (including hudi_type) so that 
downstream code
+        // calling detectVectorColumnsFromMetadata on the modified schema 
still finds vectors.
+        newFields[i] = new StructField(fields[i].name(), BinaryType$.MODULE$, 
fields[i].nullable(), fields[i].metadata());
+      } else {
+        newFields[i] = fields[i];
+      }
+    }
+    return new StructType(newFields);
+  }
+
+  /**
+   * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to 
a typed array
+   * based on the vector's element type and dimension.
+   *
+   * @param bytes        raw bytes read from Parquet
+   * @param vectorSchema the vector schema describing dimension and element 
type
+   * @return an ArrayData containing the decoded float[], double[], or byte[] 
array
+   * @throws IllegalArgumentException if byte array length doesn't match 
expected size
+   */
+  public static ArrayData convertBinaryToVectorArray(byte[] bytes, 
HoodieSchema.Vector vectorSchema) {
+    return convertBinaryToVectorArray(bytes, vectorSchema.getDimension(), 
vectorSchema.getVectorElementType());
+  }
+
+  /**
+   * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to 
a typed array.
+   *
+   * @param bytes    raw bytes read from Parquet
+   * @param dim      vector dimension (number of elements)
+   * @param elemType element type (FLOAT, DOUBLE, or INT8)
+   * @return an ArrayData containing the decoded float[], double[], or byte[] 
array
+   * @throws IllegalArgumentException if byte array length doesn't match 
expected size
+   */
+  public static ArrayData convertBinaryToVectorArray(byte[] bytes, int dim,
+                                                     
HoodieSchema.Vector.VectorElementType elemType) {
+    int expectedSize = dim * elemType.getElementSize();
+    checkArgument(bytes.length == expectedSize,
+        "Vector byte array length mismatch: expected " + expectedSize + " but 
got " + bytes.length);
+    ByteBuffer buffer = 
ByteBuffer.wrap(bytes).order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+    switch (elemType) {
+      case FLOAT:
+        float[] floats = new float[dim];
+        for (int j = 0; j < dim; j++) {
+          floats[j] = buffer.getFloat();
+        }
+        return new GenericArrayData(floats);
+      case DOUBLE:
+        double[] doubles = new double[dim];
+        for (int j = 0; j < dim; j++) {
+          doubles[j] = buffer.getDouble();
+        }
+        return new GenericArrayData(doubles);
+      case INT8:
+        byte[] int8s = new byte[dim];
+        buffer.get(int8s);
+        // Use UnsafeArrayData to avoid boxing each byte into a Byte object.
+        // GenericArrayData(byte[]) would box every element into Object[].
+        return UnsafeArrayData.fromPrimitiveArray(int8s);
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported vector element type: " + elemType);
+    }
+  }
+
+  /**
+   * Returns a {@link Function} that converts a single {@link InternalRow} by 
converting binary
+   * vector columns back to typed arrays and then applying the given 
projection callback.
+   *
+   * <p>Ordinals in {@code vectorColumns} must be relative to {@code 
readSchema} — the schema
+   * that has {@code BinaryType} for vector columns (as produced by
+   * {@link #replaceVectorColumnsWithBinary}).
+   *
+   * <p><b>Thread safety:</b> The returned function is NOT thread-safe; it 
reuses a single
+   * {@link GenericInternalRow} buffer across calls. Each call to this factory 
creates its own
+   * buffer, so separate functions returned by separate calls are independent.
+   *
+   * @param readSchema         the Spark schema of incoming rows (BinaryType 
for vector columns)
+   * @param vectorColumns      map of ordinal → Vector schema for vector 
columns, keyed by
+   *                           ordinals relative to {@code readSchema}
+   * @param projectionCallback called with the converted {@link 
GenericInternalRow}; must copy
+   *                           any data it needs to retain (e.g. {@code 
UnsafeProjection::apply})
+   * @return a function that converts one row and returns the projected result
+   */
+  public static Function<InternalRow, InternalRow> buildRowMapper(
+      StructType readSchema,
+      Map<Integer, HoodieSchema.Vector> vectorColumns,
+      Function<InternalRow, InternalRow> projectionCallback) {
+    GenericInternalRow converted = new 
GenericInternalRow(readSchema.fields().length);
+    return row -> {
+      convertRowVectorColumns(row, converted, readSchema, vectorColumns);
+      return projectionCallback.apply(converted);
+    };
+  }
+
+  /**
+   * Converts vector columns in a row from binary (BinaryType) back to typed 
arrays,
+   * copying non-vector columns as-is. The caller must supply a pre-allocated
+   * {@link GenericInternalRow} for reuse across iterations to reduce GC 
pressure.
+   *
+   * @param row           the source row (with BinaryType for vector columns)
+   * @param result        a pre-allocated GenericInternalRow to write into 
(reused across calls)
+   * @param readSchema    the Spark schema of the source row (BinaryType for 
vector columns)
+   * @param vectorColumns map of ordinal to Vector schema for vector columns
+   */
+  public static void convertRowVectorColumns(InternalRow row, 
GenericInternalRow result,
+                                             StructType readSchema,
+                                             Map<Integer, HoodieSchema.Vector> 
vectorColumns) {
+    int numFields = readSchema.fields().length;
+    for (int i = 0; i < numFields; i++) {
+      if (row.isNullAt(i)) {
+        result.setNullAt(i);
+      } else if (vectorColumns.containsKey(i)) {
+        result.update(i, convertBinaryToVectorArray(row.getBinary(i), 
vectorColumns.get(i)));
+      } else {
+        // Non-vector column: copy value as-is using the read schema's data 
type
+        result.update(i, row.get(i, readSchema.apply(i).dataType()));
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index be2ed64ece72..9ee2abaec93a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -68,6 +68,7 @@ import org.apache.spark.sql.types.YearMonthIntervalType;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.apache.spark.util.VersionUtils;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -163,6 +164,10 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
       metadata.put("org.apache.spark.legacyDateTime", "");
       metadata.put("org.apache.spark.timeZone", 
SQLConf.get().sessionLocalTimeZone());
     }
+    String vectorMeta = HoodieSchema.buildVectorColumnsMetadataValue(schema);
+    if (!vectorMeta.isEmpty()) {
+      metadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY, 
vectorMeta);
+    }
     Configuration configurationCopy = new Configuration(configuration);
     configurationCopy.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, 
Boolean.toString(writeLegacyListFormat));
     MessageType messageType = convert(structType, schema);
@@ -304,6 +309,43 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
         }
         recordConsumer.addBinary(Binary.fromReusedByteArray(fixedLengthBytes, 
0, numBytes));
       };
+    } else if (dataType instanceof ArrayType
+            && resolvedSchema != null
+            && resolvedSchema.getType() == HoodieSchemaType.VECTOR) {
+      HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) resolvedSchema;
+      int dimension = vectorSchema.getDimension();
+      HoodieSchema.Vector.VectorElementType elemType = 
vectorSchema.getVectorElementType();
+      int bufferSize = Math.multiplyExact(dimension, 
elemType.getElementSize());
+      // Buffer is reused across rows without copying. 
Binary.fromReusedByteArray wraps the
+      // same backing array. This is safe because Parquet's ColumnWriteStoreV2 
consumes the
+      // bytes before the next write call (same pattern as the decimal path 
with decimalBuffer).
+      ByteBuffer buffer = 
ByteBuffer.allocate(bufferSize).order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+      return (row, ordinal) -> {
+        ArrayData array = row.getArray(ordinal);
+        ValidationUtils.checkArgument(array.numElements() == dimension,
+            () -> String.format("Vector dimension mismatch: schema expects %d 
elements but got %d", dimension, array.numElements()));
+        buffer.clear();
+        switch (elemType) {
+          case FLOAT:
+            for (int i = 0; i < dimension; i++) {
+              buffer.putFloat(array.getFloat(i));
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < dimension; i++) {
+              buffer.putDouble(array.getDouble(i));
+            }
+            break;
+          case INT8:
+            for (int i = 0; i < dimension; i++) {
+              buffer.put(array.getByte(i));
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Unsupported vector 
element type: " + elemType);
+        }
+        recordConsumer.addBinary(Binary.fromReusedByteArray(buffer.array()));
+      };
     } else if (dataType instanceof ArrayType) {
       ValueWriter elementWriter = makeWriter(resolvedSchema == null ? null : 
resolvedSchema.getElementType(), ((ArrayType) dataType).elementType());
       if (!writeLegacyListFormat) {
@@ -489,6 +531,14 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
           .as(LogicalTypeAnnotation.decimalType(scale, precision))
           .length(Decimal.minBytesForPrecision()[precision])
           .named(structField.name());
+    } else if (dataType instanceof ArrayType
+            && resolvedSchema != null
+            && resolvedSchema.getType() == HoodieSchemaType.VECTOR) {
+      HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) resolvedSchema;
+      int fixedSize = Math.multiplyExact(vectorSchema.getDimension(),
+              vectorSchema.getVectorElementType().getElementSize());
+      return Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+              .length(fixedSize).named(structField.name());
     } else if (dataType instanceof ArrayType) {
       ArrayType arrayType = (ArrayType) dataType;
       DataType elementType = arrayType.elementType();
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index e4d97a2df6a2..88c39656a918 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -30,18 +30,18 @@ import org.apache.hudi.common.table.HoodieTableConfig
 import 
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.collection.{CachingIterator, 
ClosableIterator, Pair => HPair}
-import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, 
HoodieSparkParquetReader}
+import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, 
HoodieSparkParquetReader, VectorConversionUtils}
 import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, 
StoragePath}
 import org.apache.hudi.util.CloseableInternalRowIterator
 import 
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
 import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
SparkColumnarFileReader}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField, 
StructType}
+import org.apache.spark.sql.types.{ArrayType, ByteType, DoubleType, FloatType, 
LongType, MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
 
 import scala.collection.JavaConverters._
@@ -81,7 +81,17 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
       assert(getRecordContext.supportsParquetRowIndex())
     }
     val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
-    val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, 
hasRowIndexField)
+
+    // Detect VECTOR columns and replace with BinaryType for the Parquet reader
+    // (Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY which Spark maps to 
BinaryType)
+    val vectorColumnInfo = 
SparkFileFormatInternalRowReaderContext.detectVectorColumns(requiredSchema)
+    val parquetReadStructType = if (vectorColumnInfo.nonEmpty) {
+      
SparkFileFormatInternalRowReaderContext.replaceVectorColumnsWithBinary(structType,
 vectorColumnInfo)
+    } else {
+      structType
+    }
+
+    val (readSchema, readFilters) = 
getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField)
     if (FSUtils.isLogFile(filePath)) {
       // NOTE: now only primary key based filtering is supported for log files
       new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
@@ -100,9 +110,16 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
       } else {
         
org.apache.hudi.common.util.Option.empty[org.apache.parquet.schema.MessageType]()
       }
-      new CloseableInternalRowIterator(baseFileReader.read(fileInfo,
+      val rawIterator = new 
CloseableInternalRowIterator(baseFileReader.read(fileInfo,
         readSchema, StructType(Seq.empty), 
getSchemaHandler.getInternalSchemaOpt,
         readFilters, 
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], 
tableSchemaOpt))
+
+      // Post-process: convert binary VECTOR columns back to typed arrays
+      if (vectorColumnInfo.nonEmpty) {
+        
SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(rawIterator, 
vectorColumnInfo, readSchema)
+      } else {
+        rawIterator
+      }
     }
   }
 
@@ -290,4 +307,66 @@ object SparkFileFormatInternalRowReaderContext {
     field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
   }
 
+  /**
+   * Detects VECTOR columns from HoodieSchema.
+   * Delegates to [[VectorConversionUtils.detectVectorColumns]].
+   * @return Map of ordinal to Vector schema for VECTOR fields.
+   */
+  private[hudi] def detectVectorColumns(schema: HoodieSchema): Map[Int, 
HoodieSchema.Vector] = {
+    VectorConversionUtils.detectVectorColumns(schema).asScala.map { case (k, 
v) => (k.intValue(), v) }.toMap
+  }
+
+  /**
+   * Detects VECTOR columns from Spark StructType metadata.
+   * Delegates to [[VectorConversionUtils.detectVectorColumnsFromMetadata]].
+   * @return Map of ordinal to Vector schema for VECTOR fields.
+   */
+  def detectVectorColumnsFromMetadata(schema: StructType): Map[Int, 
HoodieSchema.Vector] = {
+    VectorConversionUtils.detectVectorColumnsFromMetadata(schema).asScala.map 
{ case (k, v) => (k.intValue(), v) }.toMap
+  }
+
+  /**
+   * Replaces ArrayType with BinaryType for VECTOR columns so the Parquet 
reader
+   * can read FIXED_LEN_BYTE_ARRAY data without type mismatch.
+   * Delegates to [[VectorConversionUtils.replaceVectorColumnsWithBinary]].
+   */
+  def replaceVectorColumnsWithBinary(structType: StructType, vectorColumns: 
Map[Int, HoodieSchema.Vector]): StructType = {
+    val javaMap = vectorColumns.map { case (k, v) => (Integer.valueOf(k), 
v.asInstanceOf[AnyRef]) }.asJava
+    VectorConversionUtils.replaceVectorColumnsWithBinary(structType, javaMap)
+  }
+
+  /**
+   * Wraps an iterator to convert binary VECTOR columns back to typed arrays.
+   * Unpacks bytes from FIXED_LEN_BYTE_ARRAY into GenericArrayData using the 
canonical vector byte order.
+   * Uses UnsafeProjection to make a defensive copy of each row.
+   */
+  private[hudi] def wrapWithVectorConversion(
+      iterator: ClosableIterator[InternalRow],
+      vectorColumns: Map[Int, HoodieSchema.Vector],
+      readSchema: StructType): ClosableIterator[InternalRow] = {
+    val javaVectorCols: java.util.Map[Integer, HoodieSchema.Vector] =
+      vectorColumns.map { case (k, v) => (Integer.valueOf(k), v) }.asJava
+    // Build output schema: replace BinaryType with the correct ArrayType for 
vector columns
+    val outputFields = readSchema.fields.zipWithIndex.map { case (field, i) =>
+      vectorColumns.get(i) match {
+        case Some(vec) =>
+          val elemType = vec.getVectorElementType match {
+            case HoodieSchema.Vector.VectorElementType.FLOAT => FloatType
+            case HoodieSchema.Vector.VectorElementType.DOUBLE => DoubleType
+            case HoodieSchema.Vector.VectorElementType.INT8 => ByteType
+          }
+          field.copy(dataType = ArrayType(elemType, containsNull = false))
+        case None => field
+      }
+    }
+    val outputSchema = StructType(outputFields)
+    val projection = UnsafeProjection.create(outputSchema)
+    val mapper = VectorConversionUtils.buildRowMapper(readSchema, 
javaVectorCols, projection.apply(_))
+    new ClosableIterator[InternalRow] {
+      override def hasNext: Boolean = iterator.hasNext
+      override def next(): InternalRow = mapper.apply(iterator.next())
+      override def close(): Unit = iterator.close()
+    }
+  }
+
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
index 03f9934f8b20..6b1934f5c4bf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
@@ -20,18 +20,49 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.parquet.hadoop.metadata.FileMetaData
+import org.apache.parquet.schema.{MessageType, PrimitiveType, Types}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.spark.sql.execution.datasources.SparkSchemaTransformUtils
 import org.apache.spark.sql.types.{DataType, StructType}
 
+import scala.collection.JavaConverters._
+
 object HoodieParquetFileFormatHelper {
 
   def buildImplicitSchemaChangeInfo(hadoopConf: Configuration,
                                     parquetFileMetaData: FileMetaData,
                                     requiredSchema: StructType): 
(java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, 
DataType]], StructType) = {
-    // Convert Parquet schema to Spark StructType
+    val originalSchema = parquetFileMetaData.getSchema
+
+    // Spark 3.3's ParquetToSparkSchemaConverter throws "Illegal Parquet type: 
FIXED_LEN_BYTE_ARRAY"
+    // for unannotated FLBA columns (e.g., Hudi VECTOR type). This was fixed 
in Spark 3.4
+    // (SPARK-41096 / https://github.com/apache/spark/pull/38628) which maps 
bare FLBA to BinaryType.
+    // On Spark 3.3 only, rewrite bare FLBA to BINARY before conversion.
+    val safeSchema = if (!HoodieSparkUtils.gteqSpark3_4) {
+      rewriteFixedLenByteArrayToBinary(originalSchema)
+    } else {
+      originalSchema
+    }
+
     val convert = new ParquetToSparkSchemaConverter(hadoopConf)
-    val fileStruct = convert.convert(parquetFileMetaData.getSchema)
+    val fileStruct = convert.convert(safeSchema)
     SparkSchemaTransformUtils.buildImplicitSchemaChangeInfo(fileStruct, 
requiredSchema)
   }
+
+  /**
+   * Rewrites bare FIXED_LEN_BYTE_ARRAY columns (no logical type annotation) 
to BINARY.
+   * Columns with annotations (e.g., DECIMAL, UUID) are left untouched.
+   */
+  private def rewriteFixedLenByteArrayToBinary(schema: MessageType): 
MessageType = {
+    val fields = schema.getFields.asScala.map {
+      case pt: PrimitiveType
+        if pt.getPrimitiveTypeName == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+          && pt.getLogicalTypeAnnotation == null =>
+        Types.primitive(PrimitiveTypeName.BINARY, 
pt.getRepetition).named(pt.getName)
+      case other => other
+    }
+    new MessageType(schema.getName, fields.asJava)
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 4cf23ba17dc8..031fd210ab90 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -115,9 +115,9 @@ public class HoodieSchema implements Serializable {
         if (params.isEmpty()) {
           throw new IllegalArgumentException("VECTOR type descriptor must 
include a dimension parameter");
         }
-        if (params.size() > 2) {
+        if (params.size() > 3) {
           throw new IllegalArgumentException(
-              "VECTOR type descriptor supports at most 2 parameters: dimension 
and optional element type");
+              "VECTOR type descriptor supports at most 3 parameters: 
dimension, optional element type, and optional storage backing");
         }
         int dimension;
         try {
@@ -128,7 +128,10 @@ public class HoodieSchema implements Serializable {
         Vector.VectorElementType elementType = params.size() > 1
             ? Vector.VectorElementType.fromString(params.get(1))
             : Vector.VectorElementType.FLOAT;
-        return createVector(dimension, elementType);
+        Vector.StorageBacking backing = params.size() > 2
+            ? Vector.StorageBacking.fromString(params.get(2))
+            : Vector.StorageBacking.FIXED_BYTES;
+        return createVector(dimension, elementType, backing);
       case BLOB:
         if (!params.isEmpty()) {
           throw new IllegalArgumentException(
@@ -199,6 +202,42 @@ public class HoodieSchema implements Serializable {
   public static final String PARQUET_ARRAY_SPARK = ".array";
   public static final String PARQUET_ARRAY_AVRO = "." + ARRAY_LIST_ELEMENT;
 
+  /**
+   * Parquet file-footer metadata key under which VECTOR column names and type 
descriptors
+   * are recorded. The value is a comma-separated list of {@code 
colName:VECTOR(dim[,elemType])}
+   * entries, e.g. {@code "embedding:VECTOR(128),tags:VECTOR(64,INT8)"}.
+   *
+   * <p>Stored as file-level key-value metadata (Parquet footer) so that any 
reader can
+   * identify vector columns without needing the Hudi schema store.
+   */
+  public static final String PARQUET_VECTOR_COLUMNS_METADATA_KEY = 
"hoodie.vector.columns";
+
+  /**
+   * Builds the value string for {@link #PARQUET_VECTOR_COLUMNS_METADATA_KEY}.
+   *
+   * @param schema a HoodieSchema of type RECORD (or null)
+   * @return comma-separated {@code colName:VECTOR(dim[,elemType])} entries, 
or empty string
+   *         if the schema is null or has no VECTOR columns
+   */
+  public static String buildVectorColumnsMetadataValue(HoodieSchema schema) {
+    if (schema == null || schema.isSchemaNull()) {
+      return "";
+    }
+    List<HoodieSchemaField> fields = schema.getFields();
+    StringBuilder sb = new StringBuilder();
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema().getNonNullType();
+      if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+        Vector vectorSchema = (Vector) fieldSchema;
+        if (sb.length() > 0) {
+          sb.append(',');
+        }
+        
sb.append(field.name()).append(':').append(vectorSchema.toTypeDescriptor());
+      }
+    }
+    return sb.toString();
+  }
+
   private Schema avroSchema;
   private HoodieSchemaType type;
   private transient List<HoodieSchemaField> fields;
@@ -770,22 +809,50 @@ public class HoodieSchema implements Serializable {
    * @return new HoodieSchema.Vector
    */
   public static HoodieSchema.Vector createVector(int dimension, 
Vector.VectorElementType elementType) {
+    return createVector(dimension, elementType, 
Vector.StorageBacking.FIXED_BYTES);
+  }
+
+  /**
+   * Creates Vector schema with custom dimension, element type, and storage 
backing.
+   *
+   * @param dimension vector dimension (must be > 0)
+   * @param elementType element type
+   * @param storageBacking physical storage format
+   * @return new HoodieSchema.Vector
+   */
+  public static HoodieSchema.Vector createVector(int dimension, 
Vector.VectorElementType elementType,
+                                                  Vector.StorageBacking 
storageBacking) {
     String vectorName = Vector.DEFAULT_NAME + "_" + 
elementType.name().toLowerCase() + "_" + dimension;
-    return createVector(vectorName, dimension, elementType);
+    return createVector(vectorName, dimension, elementType, storageBacking);
   }
 
   /**
    * Creates Vector schema with custom name, dimension, and element type.
+   * Defaults to {@link Vector.StorageBacking#FIXED_BYTES}.
    *
    * @param name FIXED type name (must not be null or empty)
    * @param dimension vector dimension (must be > 0)
-   * @param elementType element type (use {@link 
Vector.VectorElementType#FLOAT} or {@link Vector.VectorElementType#DOUBLE})
+   * @param elementType element type
    * @return new HoodieSchema.Vector
    */
   public static HoodieSchema.Vector createVector(String name, int dimension, 
Vector.VectorElementType elementType) {
+    return createVector(name, dimension, elementType, 
Vector.StorageBacking.FIXED_BYTES);
+  }
+
+  /**
+   * Creates Vector schema with custom name, dimension, element type, and 
storage backing.
+   *
+   * @param name FIXED type name (must not be null or empty)
+   * @param dimension vector dimension (must be > 0)
+   * @param elementType element type
+   * @param storageBacking physical storage format
+   * @return new HoodieSchema.Vector
+   */
+  public static HoodieSchema.Vector createVector(String name, int dimension, 
Vector.VectorElementType elementType,
+                                                  Vector.StorageBacking 
storageBacking) {
     ValidationUtils.checkArgument(name != null && !name.isEmpty(),
         () -> "Vector name must not be null or empty");
-    Schema vectorSchema = Vector.createSchema(name, dimension, elementType);
+    Schema vectorSchema = Vector.createSchema(name, dimension, elementType, 
storageBacking);
     return new HoodieSchema.Vector(vectorSchema);
   }
 
@@ -1820,10 +1887,14 @@ public class HoodieSchema implements Serializable {
      * Default element type (FLOAT) is omitted.
      */
     public String toTypeDescriptor() {
-      if (getVectorElementType() == VectorElementType.FLOAT) {
+      boolean defaultElemType = getVectorElementType() == 
VectorElementType.FLOAT;
+      boolean defaultBacking = getStorageBacking() == 
StorageBacking.FIXED_BYTES;
+      if (defaultElemType && defaultBacking) {
         return "VECTOR(" + getDimension() + ")";
+      } else if (defaultBacking) {
+        return "VECTOR(" + getDimension() + ", " + getVectorElementType() + 
")";
       }
-      return "VECTOR(" + getDimension() + ", " + getVectorElementType() + ")";
+      return "VECTOR(" + getDimension() + ", " + getVectorElementType() + ", " 
+ getStorageBacking() + ")";
     }
 
     /**
@@ -1835,11 +1906,17 @@ public class HoodieSchema implements Serializable {
      * @return new Vector schema
      */
     private static Schema createSchema(String name, int dimension, 
VectorElementType elementType) {
+      return createSchema(name, dimension, elementType, 
StorageBacking.FIXED_BYTES);
+    }
+
+    private static Schema createSchema(String name, int dimension, 
VectorElementType elementType,
+                                        StorageBacking storageBacking) {
       ValidationUtils.checkArgument(dimension > 0,
           () -> "Vector dimension must be positive: " + dimension);
 
       // Validate elementType
       VectorElementType resolvedElementType = elementType != null ? 
elementType : VectorElementType.FLOAT;
+      StorageBacking resolvedBacking = storageBacking != null ? storageBacking 
: StorageBacking.FIXED_BYTES;
 
       // Calculate fixed size: dimension × element size in bytes
       int elementSize = resolvedElementType.getElementSize();
@@ -1849,7 +1926,7 @@ public class HoodieSchema implements Serializable {
       Schema vectorSchema = Schema.createFixed(name, null, null, fixedSize);
 
       // Apply logical type with properties directly to FIXED
-      VectorLogicalType vectorLogicalType = new VectorLogicalType(dimension, 
resolvedElementType.name(), StorageBacking.FIXED_BYTES.name());
+      VectorLogicalType vectorLogicalType = new VectorLogicalType(dimension, 
resolvedElementType.name(), resolvedBacking.name());
       vectorLogicalType.addToSchema(vectorSchema);
 
       return vectorSchema;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
index e3f28165f0db..c38cf2286370 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
@@ -164,6 +164,8 @@ public class HoodieSchemaComparatorForSchemaEvolution {
         return mapSchemaEquals(s1, s2);
       case FIXED:
         return fixedSchemaEquals(s1, s2);
+      case VECTOR:
+        return vectorSchemaEquals(s1, s2);
       case UNION:
         return unionSchemaEquals(s1, s2);
       case STRING:
@@ -289,6 +291,14 @@ public class HoodieSchemaComparatorForSchemaEvolution {
     return s1.getName().equals(s2.getName()) && s1.getFixedSize() == 
s2.getFixedSize();
   }
 
+  private static boolean vectorSchemaEquals(HoodieSchema s1, HoodieSchema s2) {
+    HoodieSchema.Vector v1 = (HoodieSchema.Vector) s1;
+    HoodieSchema.Vector v2 = (HoodieSchema.Vector) s2;
+    return v1.getDimension() == v2.getDimension()
+        && v1.getVectorElementType() == v2.getVectorElementType()
+        && v1.getStorageBacking() == v2.getStorageBacking();
+  }
+
   private static boolean decimalSchemaEquals(HoodieSchema s1, HoodieSchema s2) 
{
     HoodieSchema.Decimal d1 = (HoodieSchema.Decimal) s1;
     HoodieSchema.Decimal d2 = (HoodieSchema.Decimal) s2;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
index 66935b6d1745..6897fc96ab1b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
@@ -310,6 +310,8 @@ public class HoodieSchemaCompatibilityChecker {
           case FIXED:
             result = result.mergedWith(checkSchemaNames(reader, writer, 
locations));
             return result.mergedWith(checkFixedSize(reader, writer, 
locations));
+          case VECTOR:
+            return result.mergedWith(checkVectorCompatibility(reader, writer, 
locations));
           case DECIMAL:
             return result.mergedWith(checkDecimalWidening(reader, writer, 
locations));
           case ENUM:
@@ -377,6 +379,7 @@ public class HoodieSchemaCompatibilityChecker {
           case MAP:
             return result.mergedWith(typeMismatch(reader, writer, locations));
           case FIXED:
+          case VECTOR:
             return result.mergedWith(typeMismatch(reader, writer, locations));
           case ENUM:
             return result.mergedWith(typeMismatch(reader, writer, locations));
@@ -457,6 +460,25 @@ public class HoodieSchemaCompatibilityChecker {
       return checkDecimalWidening(reader, writer, locations);
     }
 
+    // Convention: "expected" = writer (existing data), "found" = reader 
(evolved schema).
+    // This matches checkFixedSize, checkDecimalWidening, 
checkTimeCompatibility, etc.
+    private SchemaCompatibilityResult checkVectorCompatibility(final 
HoodieSchema reader, final HoodieSchema writer,
+                                                               final 
Deque<LocationInfo> locations) {
+      HoodieSchema.Vector readerVector = (HoodieSchema.Vector) reader;
+      HoodieSchema.Vector writerVector = (HoodieSchema.Vector) writer;
+      if (readerVector.getDimension() != writerVector.getDimension()
+          || readerVector.getVectorElementType() != 
writerVector.getVectorElementType()
+          || readerVector.getStorageBacking() != 
writerVector.getStorageBacking()) {
+        String message = String.format("Vector field '%s' expected dimension: 
%d, elementType: %s, storageBacking: %s, found: dimension: %d, elementType: %s, 
storageBacking: %s",
+            getLocationName(locations, reader.getType()),
+            writerVector.getDimension(), writerVector.getVectorElementType(), 
writerVector.getStorageBacking(),
+            readerVector.getDimension(), readerVector.getVectorElementType(), 
readerVector.getStorageBacking());
+        return 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.TYPE_MISMATCH, 
reader, writer,
+            message, asList(locations));
+      }
+      return SchemaCompatibilityResult.compatible();
+    }
+
     private SchemaCompatibilityResult checkDecimalWidening(final HoodieSchema 
reader, final HoodieSchema writer,
                                                            final 
Deque<LocationInfo> locations) {
       boolean isReaderDecimal = reader.getType() == HoodieSchemaType.DECIMAL;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
index da67b952258e..1d9ae8c1b686 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
@@ -73,7 +73,12 @@ public interface Type extends Serializable {
     TIME_MILLIS(Integer.class),
     TIMESTAMP_MILLIS(Long.class),
     LOCAL_TIMESTAMP_MILLIS(Long.class),
-    LOCAL_TIMESTAMP_MICROS(Long.class);
+    LOCAL_TIMESTAMP_MICROS(Long.class),
+    // ByteBuffer.class is a placeholder — classTag is currently unused in the 
codebase.
+    // The actual storage format is determined by VectorType.storageBacking 
(e.g.,
+    // PARQUET_FIXED_LEN_BYTE_ARRAY). If new backing types are added, this 
class tag
+    // may need to become backing-dependent or remain unused.
+    VECTOR(ByteBuffer.class);
 
     private final String name;
     private final Class<?> classTag;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
index 86ec29da9c45..aaac7b51928d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.internal.schema;
 
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.internal.schema.Type.NestedType;
 import org.apache.hudi.internal.schema.Type.PrimitiveType;
 
@@ -298,6 +299,75 @@ public class Types {
     }
   }
 
+  /**
+   * Vector type that preserves dimension, element type, and storage backing
+   * through InternalSchema round-trips.
+   *
+   * <p>This class is part of the InternalSchema type system (separate from 
HoodieSchema)
+   * and follows the same pattern as {@link FixedType}, {@link 
DecimalTypeFixed}, etc.
+   * It cannot be replaced with {@code HoodieSchema.Vector} because they 
belong to
+   * different type hierarchies.
+   */
+  public static class VectorType extends PrimitiveType {
+    private final int dimension;
+    private final String elementType;
+    private final String storageBacking;
+
+    public static VectorType get(int dimension, String elementType, String 
storageBacking) {
+      return new VectorType(dimension, elementType, storageBacking);
+    }
+
+    private VectorType(int dimension, String elementType, String 
storageBacking) {
+      // Validate that the strings correspond to known enum values to fail 
fast on typos
+      HoodieSchema.Vector.VectorElementType.fromString(elementType);
+      HoodieSchema.Vector.StorageBacking.fromString(storageBacking);
+      this.dimension = dimension;
+      this.elementType = elementType;
+      this.storageBacking = storageBacking;
+    }
+
+    public int getDimension() {
+      return dimension;
+    }
+
+    public String getElementType() {
+      return elementType;
+    }
+
+    public String getStorageBacking() {
+      return storageBacking;
+    }
+
+    @Override
+    public TypeID typeId() {
+      return TypeID.VECTOR;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("vector[%d, %s, %s]", dimension, elementType, 
storageBacking);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      } else if (!(o instanceof VectorType)) {
+        return false;
+      }
+
+      VectorType that = (VectorType) o;
+      return dimension == that.dimension
+          && Objects.equals(elementType, that.elementType)
+          && Objects.equals(storageBacking, that.storageBacking);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(VectorType.class, dimension, elementType, 
storageBacking);
+    }
+  }
+
   public abstract static class DecimalBase extends PrimitiveType {
 
     protected final int scale;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
index 9783ecac1b5a..c3bbedf319d0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
@@ -314,6 +314,12 @@ public class InternalSchemaConverter {
         return Types.StringType.get();
       case FIXED:
         return Types.FixedType.getFixed(schema.getFixedSize());
+      case VECTOR:
+        HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+        return Types.VectorType.get(
+            vectorSchema.getDimension(),
+            vectorSchema.getVectorElementType().name(),
+            vectorSchema.getStorageBacking().name());
       case BYTES:
         return Types.BinaryType.get();
       case UUID:
@@ -538,6 +544,14 @@ public class InternalSchemaConverter {
         return HoodieSchema.createFixed(name, null, null, 
fixed.getFixedSize());
       }
 
+      case VECTOR: {
+        Types.VectorType vector = (Types.VectorType) primitive;
+        return HoodieSchema.createVector(
+            vector.getDimension(),
+            
HoodieSchema.Vector.VectorElementType.fromString(vector.getElementType()),
+            
HoodieSchema.Vector.StorageBacking.fromString(vector.getStorageBacking()));
+      }
+
       case DECIMAL:
       case DECIMAL_FIXED: {
         Types.DecimalTypeFixed decimal = (Types.DecimalTypeFixed) primitive;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index bfa0c803fe64..2babb429260e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -284,11 +284,14 @@ public class HoodieTableMetadataUtil {
         String fieldName = fieldNameFieldPair.getKey();
         HoodieSchemaField field = fieldNameFieldPair.getValue();
         HoodieSchema fieldSchema = field.schema().getNonNullType();
+        if (!isColumnTypeSupported(fieldSchema, 
Option.of(record.getRecordType()), indexVersion)) {
+          return;
+        }
         ColumnStats colStats = allColumnStats.computeIfAbsent(fieldName, 
ignored -> new ColumnStats(getValueMetadata(fieldSchema, indexVersion)));
         Object fieldValue = collectColumnRangeFieldValue(record, 
colStats.valueMetadata, fieldName, fieldSchema, recordSchema, properties);
 
         colStats.valueCount++;
-        if (fieldValue != null && isColumnTypeSupported(fieldSchema, 
Option.of(record.getRecordType()), indexVersion)) {
+        if (fieldValue != null) {
           // Set the min value of the field
           if (colStats.minValue == null
               || ConvertingGenericData.INSTANCE.compare(fieldValue, 
colStats.minValue, fieldSchema.toAvroSchema()) < 0) {
@@ -2056,7 +2059,7 @@ public class HoodieTableMetadataUtil {
     // Check for precision and scale if the schema has a logical decimal type.
     return type != HoodieSchemaType.RECORD && type != HoodieSchemaType.MAP
         && type != HoodieSchemaType.ARRAY && type != HoodieSchemaType.ENUM
-        && type != HoodieSchemaType.BLOB;
+        && type != HoodieSchemaType.BLOB && type != HoodieSchemaType.VECTOR;
   }
 
   public static Set<String> getInflightMetadataPartitions(HoodieTableConfig 
tableConfig) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index 1032e228034d..1bf7d1621071 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -2272,6 +2272,18 @@ public class TestHoodieSchema {
     HoodieSchema.Vector vector = (HoodieSchema.Vector) parsed;
     assertEquals(512, vector.getDimension());
     assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE, 
vector.getVectorElementType());
+    assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES, 
vector.getStorageBacking());
+  }
+
+  @Test
+  public void testParseTypeDescriptorVectorWithStorageBacking() {
+    // Explicit storageBacking as 3rd param
+    HoodieSchema parsed = HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, 
FIXED_BYTES)");
+    assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
+    HoodieSchema.Vector vector = (HoodieSchema.Vector) parsed;
+    assertEquals(128, vector.getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT, 
vector.getVectorElementType());
+    assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES, 
vector.getStorageBacking());
   }
 
   @Test
@@ -2289,6 +2301,10 @@ public class TestHoodieSchema {
     assertEquals(HoodieSchemaType.VECTOR, parsed.getType());
     assertEquals(256, parsedVector.getDimension());
     assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT, 
parsedVector.getVectorElementType());
+    assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES, 
parsedVector.getStorageBacking());
+
+    // Default backing should not appear in descriptor string
+    assertFalse(typeString.contains("FIXED_BYTES"), "Default storageBacking 
should be omitted from descriptor");
 
     // Non-default element type round-trip
     HoodieSchema.Vector vectorDouble = HoodieSchema.createVector(64, 
HoodieSchema.Vector.VectorElementType.DOUBLE);
@@ -2299,6 +2315,7 @@ public class TestHoodieSchema {
     assertEquals(HoodieSchemaType.VECTOR, parsedDouble.getType());
     assertEquals(64, parsedDoubleVector.getDimension());
     assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE, 
parsedDoubleVector.getVectorElementType());
+    assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES, 
parsedDoubleVector.getStorageBacking());
   }
 
   @Test
@@ -2318,12 +2335,15 @@ public class TestHoodieSchema {
     assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR"));
     assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR()"));
 
-    // Too many parameters
-    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, extra)"));
+    // Too many parameters (4+)
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, FIXED_BYTES, extra)"));
 
     // Invalid element type
     assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(128, INVALID)"));
 
+    // Invalid storage backing
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(128, FLOAT, UNKNOWN_BACKING)"));
+
     // Zero and negative dimensions
     assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(0)"));
     assertThrows(IllegalArgumentException.class, () -> 
HoodieSchema.parseTypeDescriptor("VECTOR(-1)"));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
index 29d53f0c146c..09724cc54e91 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
@@ -611,4 +611,22 @@ class TestHoodieSchemaComparatorForSchemaEvolution {
     // BLOB with fields in different order should not be equal (order matters)
     assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(blob1, 
blobWithDifferentOrder));
   }
+
+  @Test
+  void testVectorSchemaEquality() {
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.createVector(16, 
HoodieSchema.Vector.VectorElementType.FLOAT),
+        HoodieSchema.createVector(16, 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    ));
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.createVector(16, 
HoodieSchema.Vector.VectorElementType.FLOAT),
+        HoodieSchema.createVector(16, 
HoodieSchema.Vector.VectorElementType.DOUBLE)
+    ));
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.createVector(16, 
HoodieSchema.Vector.VectorElementType.FLOAT),
+        HoodieSchema.createVector(32, 
HoodieSchema.Vector.VectorElementType.FLOAT)
+    ));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
index 6ff89d6e8586..f3313db6ffd8 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.common.schema.TestHoodieSchemaUtils.EVOLVED_SCHEMA;
 import static 
org.apache.hudi.common.schema.TestHoodieSchemaUtils.SIMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -814,4 +815,41 @@ public class TestHoodieSchemaCompatibility {
     HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
     
assertFalse(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(blob1, 
stringSchema));
   }
+
+  @Test
+  public void testVectorSchemaCompatibility() {
+    HoodieSchema incomingSchema = HoodieSchema.createRecord("vector_record", 
null, null, Arrays.asList(
+        HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+        HoodieSchemaField.of("embedding", HoodieSchema.createVector(4, 
HoodieSchema.Vector.VectorElementType.DOUBLE), null, null)
+    ));
+
+    HoodieSchema tableSchema = HoodieSchema.createRecord("vector_record", 
null, null, Arrays.asList(
+        HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+        HoodieSchemaField.of("embedding", HoodieSchema.createVector(4, 
HoodieSchema.Vector.VectorElementType.DOUBLE), null, null)
+    ));
+
+    HoodieSchemaCompatibilityChecker.SchemaPairCompatibility compatibility =
+        
HoodieSchemaCompatibilityChecker.checkReaderWriterCompatibility(incomingSchema, 
tableSchema, false);
+    
assertEquals(HoodieSchemaCompatibilityChecker.SchemaCompatibilityType.COMPATIBLE,
 compatibility.getType());
+    assertDoesNotThrow(() -> 
HoodieSchemaCompatibility.checkValidEvolution(incomingSchema, tableSchema));
+  }
+
+  @Test
+  public void testVectorSchemaCompatibilityRejectsElementTypeEvolution() {
+    HoodieSchema incomingSchema = HoodieSchema.createRecord("vector_record", 
null, null, Arrays.asList(
+        HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+        HoodieSchemaField.of("embedding", HoodieSchema.createVector(4, 
HoodieSchema.Vector.VectorElementType.DOUBLE), null, null)
+    ));
+
+    HoodieSchema tableSchema = HoodieSchema.createRecord("vector_record", 
null, null, Arrays.asList(
+        HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+        HoodieSchemaField.of("embedding", HoodieSchema.createVector(4, 
HoodieSchema.Vector.VectorElementType.FLOAT), null, null)
+    ));
+
+    HoodieSchemaCompatibilityChecker.SchemaPairCompatibility compatibility =
+        
HoodieSchemaCompatibilityChecker.checkReaderWriterCompatibility(incomingSchema, 
tableSchema, false);
+    
assertEquals(HoodieSchemaCompatibilityChecker.SchemaCompatibilityType.INCOMPATIBLE,
 compatibility.getType());
+    assertThrows(SchemaBackwardsCompatibilityException.class,
+        () -> HoodieSchemaCompatibility.checkValidEvolution(incomingSchema, 
tableSchema));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/convert/TestInternalSchemaConverter.java
 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/convert/TestInternalSchemaConverter.java
index 1ec7927813f0..44221d229b7f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/convert/TestInternalSchemaConverter.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/convert/TestInternalSchemaConverter.java
@@ -22,6 +22,8 @@ package org.apache.hudi.internal.schema.convert;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
 
 import org.junit.jupiter.api.Test;
 
@@ -111,4 +113,60 @@ public class TestInternalSchemaConverter {
     assertEquals(expectedOutput.size(), fieldNames.size());
     assertTrue(fieldNames.containsAll(expectedOutput));
   }
+
+  @Test
+  public void testVectorTypeRoundTrip() {
+    // Create a HoodieSchema with a VECTOR field
+    HoodieSchema vectorField = HoodieSchema.createVector(128, 
HoodieSchema.Vector.VectorElementType.FLOAT);
+    HoodieSchema recordSchema = HoodieSchema.createRecord("vectorRecord", 
null, null, Arrays.asList(
+        HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+        HoodieSchemaField.of("embedding", vectorField, null, null)
+    ));
+
+    // HoodieSchema → InternalSchema
+    InternalSchema internalSchema = 
InternalSchemaConverter.convert(recordSchema);
+
+    // Verify the InternalSchema has a VectorType
+    Types.VectorType internalVectorType = (Types.VectorType) 
internalSchema.findType("embedding");
+    assertEquals(128, internalVectorType.getDimension());
+    assertEquals("FLOAT", internalVectorType.getElementType());
+    assertEquals("FIXED_BYTES", internalVectorType.getStorageBacking());
+
+    // InternalSchema → HoodieSchema
+    HoodieSchema roundTripped = 
InternalSchemaConverter.convert(internalSchema, "vectorRecord");
+
+    // Verify vector properties survived the round-trip
+    HoodieSchemaField embeddingField = roundTripped.getFields().stream()
+        .filter(f -> f.name().equals("embedding"))
+        .findFirst().get();
+    HoodieSchema embeddingSchema = embeddingField.schema().getNonNullType();
+    assertEquals(HoodieSchemaType.VECTOR, embeddingSchema.getType());
+    HoodieSchema.Vector roundTrippedVector = (HoodieSchema.Vector) 
embeddingSchema;
+    assertEquals(128, roundTrippedVector.getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.FLOAT, 
roundTrippedVector.getVectorElementType());
+    assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES, 
roundTrippedVector.getStorageBacking());
+  }
+
+  @Test
+  public void testVectorTypeRoundTripDouble() {
+    HoodieSchema vectorField = HoodieSchema.createVector(64, 
HoodieSchema.Vector.VectorElementType.DOUBLE);
+    HoodieSchema recordSchema = 
HoodieSchema.createRecord("vectorDoubleRecord", null, null, Arrays.asList(
+        HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT), 
null, null),
+        HoodieSchemaField.of("vec", vectorField, null, null)
+    ));
+
+    InternalSchema internalSchema = 
InternalSchemaConverter.convert(recordSchema);
+    Types.VectorType internalVec = (Types.VectorType) 
internalSchema.findType("vec");
+    assertEquals(64, internalVec.getDimension());
+    assertEquals("DOUBLE", internalVec.getElementType());
+
+    HoodieSchema roundTripped = 
InternalSchemaConverter.convert(internalSchema, "vectorDoubleRecord");
+    HoodieSchemaField vecField = roundTripped.getFields().stream()
+        .filter(f -> f.name().equals("vec"))
+        .findFirst().get();
+    HoodieSchema.Vector rtVec = (HoodieSchema.Vector) 
vecField.schema().getNonNullType();
+    assertEquals(64, rtVec.getDimension());
+    assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE, 
rtVec.getVectorElementType());
+    assertEquals(HoodieSchema.Vector.StorageBacking.FIXED_BYTES, 
rtVec.getStorageBacking());
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 171bd9231557..b0093315306d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -322,4 +322,13 @@ class TestHoodieTableMetadataUtil {
     assertFalse(HoodieTableMetadataUtil.isTimestampMillisField(stringSchema),
         "Should return false for string");
   }
+
+  @Test
+  void testVectorColumnsAreNotSupportedForV2ColumnStats() {
+    HoodieSchema vectorSchema = 
HoodieSchema.createNullable(HoodieSchema.createVector(128));
+    HoodieSchema stringSchema = 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+
+    assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(vectorSchema, 
Option.empty(), HoodieIndexVersion.V2));
+    assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(stringSchema, 
Option.empty(), HoodieIndexVersion.V2));
+  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index 97428a6819e4..547b02787369 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -48,6 +48,10 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
     super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
     this.properties = properties;
+    String vectorMeta = 
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
+    if (!vectorMeta.isEmpty()) {
+      footerMetadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY, 
vectorMeta);
+    }
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
 
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
index 4f8d88d0f6d9..010a0b7055ae 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
@@ -248,6 +248,18 @@ public class AvroSchemaConverterWithTimestampNTZ extends 
HoodieAvroParquetSchema
           builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, 
repetition).length(schema.getFixedSize());
         }
         break;
+      case VECTOR:
+        // Vectors are stored as bare FIXED_LEN_BYTE_ARRAY without a Parquet 
logical type annotation.
+        // Vector semantics (dimension, element type) are resolved from 
HoodieSchema (the table's
+        // stored schema), not from the Parquet file schema. The reverse 
direction
+        // (FIXED_LEN_BYTE_ARRAY → HoodieSchema) currently maps to generic 
FIXED; this is
+        // acceptable because the read path detects vectors from the 
HoodieSchema, not from Parquet.
+        // TODO: Consider adding VectorLogicalTypeAnnotation for fully 
self-describing Parquet files.
+        HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+        int fixedSize = vectorSchema.getDimension()
+                * vectorSchema.getVectorElementType().getElementSize();
+        builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, 
repetition).length(fixedSize);
+        break;
       case UNION:
         return convertUnion(fieldName, schema, repetition, schemaPath);
       default:
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 8d83c9f4223a..9ba25424a9ca 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.hudi.{HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, 
HoodiePartitionFileSliceMapping, HoodieSchemaConversionUtils, HoodieSparkUtils, 
HoodieTableSchema, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
-import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.cdc.{CDCFileGroupIterator, HoodieCDCFileGroupSplit, 
HoodieCDCFileIndex}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.schema.HoodieSchemaUtils
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
ParquetTableSchemaResolver}
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
@@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.io.IOUtils
 import 
org.apache.hudi.io.storage.HoodieSparkParquetReader.ENABLE_LOGICAL_TIMESTAMP_REPAIR
+import org.apache.hudi.io.storage.VectorConversionUtils
 import org.apache.hudi.storage.StorageConfiguration
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 
@@ -47,7 +48,7 @@ import org.apache.spark.internal.Logging
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
 import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, 
PartitionedFile, SparkColumnarFileReader}
 import org.apache.spark.sql.execution.datasources.orc.OrcUtils
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector}
@@ -116,6 +117,22 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
    */
   private var supportReturningBatch = false
 
+  /**
+   * Cached result of vector column detection keyed by schema identity.
+   * Avoids re-parsing metadata on repeated supportBatch / readBaseFile calls 
with the same schema.
+   */
+  @transient private var cachedVectorDetection: (StructType, Map[Int, 
HoodieSchema.Vector]) = _
+
+  private def detectVectorColumnsCached(schema: StructType): Map[Int, 
HoodieSchema.Vector] = {
+    if (cachedVectorDetection != null && (cachedVectorDetection._1 eq schema)) 
{
+      cachedVectorDetection._2
+    } else {
+      val result = detectVectorColumns(schema)
+      cachedVectorDetection = (schema, result)
+      result
+    }
+  }
+
   /**
    * Checks if the file format supports vectorized reading, please refer to 
SPARK-40918.
    *
@@ -128,30 +145,38 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
    *
    */
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    val conf = sparkSession.sessionState.conf
-    val parquetBatchSupported = 
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && 
supportBatchWithTableSchema
-    val orcBatchSupported = conf.orcVectorizedReaderEnabled &&
-      schema.forall(s => OrcUtils.supportColumnarReads(
-        s.dataType, 
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
-    // TODO: Implement columnar batch reading 
https://github.com/apache/hudi/issues/17736
-    val lanceBatchSupported = false
-
-    val supportBatch = if (isMultipleBaseFileFormatsEnabled) {
-      parquetBatchSupported && orcBatchSupported
-    } else if (hoodieFileFormat == HoodieFileFormat.PARQUET) {
-      parquetBatchSupported
-    } else if (hoodieFileFormat == HoodieFileFormat.ORC) {
-      orcBatchSupported
-    } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
-      lanceBatchSupported
+    // Vector columns are stored as FIXED_LEN_BYTE_ARRAY in Parquet but read 
as ArrayType in Spark.
+    // The binary→array conversion requires row-level access, so disable 
vectorized batch reading.
+    if (detectVectorColumnsCached(schema).nonEmpty) {
+      supportVectorizedRead = false
+      supportReturningBatch = false
+      false
     } else {
-      throw new HoodieNotSupportedException("Unsupported file format: " + 
hoodieFileFormat)
+      val conf = sparkSession.sessionState.conf
+      val parquetBatchSupported = 
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && 
supportBatchWithTableSchema
+      val orcBatchSupported = conf.orcVectorizedReaderEnabled &&
+        schema.forall(s => OrcUtils.supportColumnarReads(
+          s.dataType, 
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
+      // TODO: Implement columnar batch reading 
https://github.com/apache/hudi/issues/17736
+      val lanceBatchSupported = false
+
+      val supportBatch = if (isMultipleBaseFileFormatsEnabled) {
+        parquetBatchSupported && orcBatchSupported
+      } else if (hoodieFileFormat == HoodieFileFormat.PARQUET) {
+        parquetBatchSupported
+      } else if (hoodieFileFormat == HoodieFileFormat.ORC) {
+        orcBatchSupported
+      } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
+        lanceBatchSupported
+      } else {
+        throw new HoodieNotSupportedException("Unsupported file format: " + 
hoodieFileFormat)
+      }
+      supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch
+      supportReturningBatch = !isMOR && supportVectorizedRead
+      logInfo(s"supportReturningBatch: $supportReturningBatch, 
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " 
+
+        s"isBootstrap: $isBootstrap, superSupportBatch: $supportBatch")
+      supportReturningBatch
     }
-    supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch
-    supportReturningBatch = !isMOR && supportVectorizedRead
-    logInfo(s"supportReturningBatch: $supportReturningBatch, 
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " 
+
-      s"isBootstrap: $isBootstrap, superSupportBatch: $supportBatch")
-    supportReturningBatch
   }
 
   //for partition columns that we read from the file, we don't want them to be 
constant column vectors so we
@@ -398,21 +423,60 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     }
   }
 
+  private def detectVectorColumns(schema: StructType): Map[Int, 
HoodieSchema.Vector] =
+    
SparkFileFormatInternalRowReaderContext.detectVectorColumnsFromMetadata(schema)
+
+  private def replaceVectorFieldsWithBinary(schema: StructType, vectorCols: 
Map[Int, HoodieSchema.Vector]): StructType =
+    
SparkFileFormatInternalRowReaderContext.replaceVectorColumnsWithBinary(schema, 
vectorCols)
+
+  /**
+   * Detects vector columns and replaces them with BinaryType in one step.
+   * @return (modified schema with BinaryType for vectors, vector column 
ordinal map)
+   */
+  private def withVectorRewrite(schema: StructType): (StructType, Map[Int, 
HoodieSchema.Vector]) = {
+    val vecs = detectVectorColumns(schema)
+    if (vecs.nonEmpty) (replaceVectorFieldsWithBinary(schema, vecs), vecs) 
else (schema, vecs)
+  }
+
+  /**
+   * Wraps an iterator to convert binary VECTOR columns back to typed arrays.
+   * The read schema has BinaryType for vector columns; the target schema has 
ArrayType.
+   */
+  private def wrapWithVectorConversion(iter: Iterator[InternalRow],
+                                        readSchema: StructType,
+                                        targetSchema: StructType,
+                                        vectorCols: Map[Int, 
HoodieSchema.Vector]): Iterator[InternalRow] = {
+    val vectorProjection = UnsafeProjection.create(targetSchema)
+    val javaVectorCols: java.util.Map[Integer, HoodieSchema.Vector] =
+      vectorCols.map { case (k, v) => (Integer.valueOf(k), v) }.asJava
+    val mapper = VectorConversionUtils.buildRowMapper(readSchema, 
javaVectorCols, vectorProjection.apply(_))
+    iter.map(mapper.apply(_))
+  }
+
   // executor
   private def readBaseFile(file: PartitionedFile, parquetFileReader: 
SparkColumnarFileReader, requestedSchema: StructType,
                            remainingPartitionSchema: StructType, 
fixedPartitionIndexes: Set[Int], requiredSchema: StructType,
                            partitionSchema: StructType, outputSchema: 
StructType, filters: Seq[Filter],
                            storageConf: StorageConfiguration[Configuration]): 
Iterator[InternalRow] = {
-    if (remainingPartitionSchema.fields.length == 
partitionSchema.fields.length) {
+    // Detect vector columns and create modified schemas with BinaryType.
+    // Each schema is detected independently because ordinals are relative to 
the schema being
+    // modified — outputSchema and requestedSchema may have vector columns at 
different positions
+    // than requiredSchema (e.g. when partition columns are interleaved).
+    val (modifiedRequiredSchema, vectorCols) = 
withVectorRewrite(requiredSchema)
+    val hasVectors = vectorCols.nonEmpty
+    val (modifiedOutputSchema, outputVectorCols) = if (hasVectors) 
withVectorRewrite(outputSchema) else (outputSchema, Map.empty[Int, 
HoodieSchema.Vector])
+    val (modifiedRequestedSchema, _) = if (hasVectors) 
withVectorRewrite(requestedSchema) else (requestedSchema, Map.empty[Int, 
HoodieSchema.Vector])
+
+    val rawIter = if (remainingPartitionSchema.fields.length == 
partitionSchema.fields.length) {
       //none of partition fields are read from the file, so the reader will do 
the appending for us
-      parquetFileReader.read(file, requiredSchema, partitionSchema, 
internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType)
+      parquetFileReader.read(file, modifiedRequiredSchema, partitionSchema, 
internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType)
     } else if (remainingPartitionSchema.fields.length == 0) {
       //we read all of the partition fields from the file
       val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
       //we need to modify the partitioned file so that the partition values 
are empty
       val modifiedFile = pfileUtils.createPartitionedFile(InternalRow.empty, 
pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
       //and we pass an empty schema for the partition schema
-      parquetFileReader.read(modifiedFile, outputSchema, new StructType(), 
internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType)
+      parquetFileReader.read(modifiedFile, modifiedOutputSchema, new 
StructType(), internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType)
     } else {
       //need to do an additional projection here. The case in mind is that 
partition schema is "a,b,c" mandatoryFields is "a,c",
       //then we will read (dataSchema + a + c) and append b. So the final 
schema will be (data schema + a + c +b)
@@ -420,8 +484,20 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
       val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
       val partitionValues = getFixedPartitionValues(file.partitionValues, 
partitionSchema, fixedPartitionIndexes)
       val modifiedFile = pfileUtils.createPartitionedFile(partitionValues, 
pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
-      val iter = parquetFileReader.read(modifiedFile, requestedSchema, 
remainingPartitionSchema, internalSchemaOpt, filters, storageConf, 
tableSchemaAsMessageType)
-      projectIter(iter, StructType(requestedSchema.fields ++ 
remainingPartitionSchema.fields), outputSchema)
+      val iter = parquetFileReader.read(modifiedFile, modifiedRequestedSchema, 
remainingPartitionSchema, internalSchemaOpt, filters, storageConf, 
tableSchemaAsMessageType)
+      projectIter(iter, StructType(modifiedRequestedSchema.fields ++ 
remainingPartitionSchema.fields), modifiedOutputSchema)
+    }
+
+    if (hasVectors) {
+      // The raw iterator has BinaryType for vector columns; convert back to 
ArrayType
+      val readSchema = if (remainingPartitionSchema.fields.length == 
partitionSchema.fields.length) {
+        StructType(modifiedRequiredSchema.fields ++ partitionSchema.fields)
+      } else {
+        modifiedOutputSchema
+      }
+      wrapWithVectorConversion(rawIter, readSchema, outputSchema, 
outputVectorCols)
+    } else {
+      rawIter
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
new file mode 100644
index 000000000000..0522950f0d9f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.util.HadoopInputFile
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions._
+
+import scala.collection.JavaConverters._
+
+/**
+ * End-to-end tests for vector column support in Hudi.
+ * Tests round-trip data correctness through Spark DataFrames.
+ */
+class TestVectorDataSource extends HoodieSparkClientTestBase {
+
+  var spark: SparkSession = null
+
+  @BeforeEach override def setUp(): Unit = {
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initHoodieStorage()
+  }
+
+  @AfterEach override def tearDown(): Unit = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  private def vectorMetadata(descriptor: String): Metadata =
+    new MetadataBuilder().putString(HoodieSchema.TYPE_METADATA_FIELD, 
descriptor).build()
+
+  private def createVectorDf(schema: StructType, data: Seq[Row]): DataFrame =
+    spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+
+  private def writeHudiTable(df: DataFrame, tableName: String, path: String,
+      tableType: String = "COPY_ON_WRITE", precombineField: String = "id",
+      mode: SaveMode = SaveMode.Overwrite, extraOpts: Map[String, String] = 
Map.empty): Unit = {
+    var writer = df.write.format("hudi")
+      .option(RECORDKEY_FIELD.key, "id")
+      .option(PRECOMBINE_FIELD.key, precombineField)
+      .option(TABLE_NAME.key, tableName)
+      .option(TABLE_TYPE.key, tableType)
+    extraOpts.foreach { case (k, v) => writer = writer.option(k, v) }
+    writer.mode(mode).save(path)
+  }
+
+  private def readHudiTable(path: String): DataFrame =
+    spark.read.format("hudi").load(path)
+
+  @Test
+  def testVectorRoundTrip(): Unit = {
+    // 1. Create schema with vector metadata
+    val metadata = vectorMetadata("VECTOR(128)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata),
+      StructField("label", StringType, nullable = true)
+    ))
+
+    // 2. Generate test data (128-dim float vectors)
+    val random = new scala.util.Random(42)
+    val data = (0 until 100).map { i =>
+      val embedding = Array.fill(128)(random.nextFloat())
+      Row(s"key_$i", embedding.toSeq, s"label_$i")
+    }
+
+    val df = createVectorDf(schema, data)
+
+    // 3. Write as COW Hudi table
+    writeHudiTable(df, "vector_test_table", basePath)
+
+    // 4. Read back
+    val readDf = readHudiTable(basePath)
+
+    // 5. Verify row count
+    assertEquals(100, readDf.count())
+
+    // 6. Verify schema preserved
+    val embeddingField = readDf.schema("embedding")
+    assertTrue(embeddingField.dataType.isInstanceOf[ArrayType])
+    val arrayType = embeddingField.dataType.asInstanceOf[ArrayType]
+    assertEquals(FloatType, arrayType.elementType)
+    assertFalse(arrayType.containsNull)
+
+    // 7. Verify vector metadata preserved
+    val readMetadata = embeddingField.metadata
+    assertTrue(readMetadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    val parsedSchema = HoodieSchema.parseTypeDescriptor(
+      readMetadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+    assertEquals(HoodieSchemaType.VECTOR, parsedSchema.getType)
+    val vectorSchema = parsedSchema.asInstanceOf[HoodieSchema.Vector]
+    assertEquals(128, vectorSchema.getDimension)
+
+    // 8. Verify float values match exactly
+    val originalRows = df.select("id", "embedding").collect()
+      .map(r => (r.getString(0), r.getSeq[Float](1)))
+      .toMap
+
+    val readRows = readDf.select("id", "embedding").collect()
+      .map(r => (r.getString(0), r.getSeq[Float](1)))
+      .toMap
+
+    originalRows.foreach { case (id, origEmbedding) =>
+      val readEmbedding = readRows(id)
+      assertEquals(128, readEmbedding.size, s"Vector size mismatch for $id")
+
+      origEmbedding.zip(readEmbedding).zipWithIndex.foreach {
+        case ((orig, read), idx) =>
+          assertEquals(orig, read, 1e-9f,
+            s"Vector mismatch at $id index $idx: orig=$orig read=$read")
+      }
+    }
+  }
+
+
+  @Test
+  def testNullableVectorField(): Unit = {
+    // Vector column itself nullable (entire array can be null)
+    val metadata = vectorMetadata("VECTOR(32)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = true, metadata) // nullable = true
+    ))
+
+    val data = Seq(
+      Row("key_1", Array.fill(32)(0.5f).toSeq),
+      Row("key_2", null), // null vector
+      Row("key_3", Array.fill(32)(1.0f).toSeq)
+    )
+
+    val df = createVectorDf(schema, data)
+
+    writeHudiTable(df, "nullable_vector_test", basePath + "/nullable")
+
+    val readDf = readHudiTable(basePath + "/nullable")
+    val readRows = readDf.select("id", "embedding").collect()
+
+    // Verify null handling
+    val key2Row = readRows.find(_.getString(0) == "key_2").get
+    assertTrue(key2Row.isNullAt(1), "Null vector not preserved")
+
+    // Verify non-null vectors preserved correctly
+    val key1Row = readRows.find(_.getString(0) == "key_1").get
+    assertFalse(key1Row.isNullAt(1))
+    val key1Embedding = key1Row.getSeq[Float](1)
+    assertEquals(32, key1Embedding.size)
+    assertTrue(key1Embedding.forall(_ == 0.5f))
+
+    val key3Row = readRows.find(_.getString(0) == "key_3").get
+    assertFalse(key3Row.isNullAt(1))
+    val key3Embedding = key3Row.getSeq[Float](1)
+    assertEquals(32, key3Embedding.size)
+    assertTrue(key3Embedding.forall(_ == 1.0f))
+  }
+
+  @Test
+  def testColumnProjectionWithVector(): Unit = {
+    val metadata = vectorMetadata("VECTOR(16)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata),
+      StructField("label", StringType, nullable = true),
+      StructField("score", IntegerType, nullable = true)
+    ))
+
+    val data = (0 until 10).map { i =>
+      Row(s"key_$i", Array.fill(16)(i.toFloat).toSeq, s"label_$i", i * 10)
+    }
+
+    val df = createVectorDf(schema, data)
+
+    writeHudiTable(df, "projection_vector_test", basePath + "/projection")
+
+    // Read only non-vector columns (vector column excluded)
+    val nonVectorDf = readHudiTable(basePath + "/projection")
+      .select("id", "label", "score")
+    assertEquals(10, nonVectorDf.count())
+    val row0 = nonVectorDf.filter("id = 'key_0'").collect()(0)
+    assertEquals("label_0", row0.getString(1))
+    assertEquals(0, row0.getInt(2))
+
+    // Read only the vector column with id
+    val vectorOnlyDf = readHudiTable(basePath + "/projection")
+      .select("id", "embedding")
+    assertEquals(10, vectorOnlyDf.count())
+    val vecRow = vectorOnlyDf.filter("id = 'key_5'").collect()(0)
+    val embedding = vecRow.getSeq[Float](1)
+    assertEquals(16, embedding.size)
+    assertTrue(embedding.forall(_ == 5.0f))
+
+    // Read all columns including vector
+    val allDf = readHudiTable(basePath + "/projection")
+      .select("id", "embedding", "label", "score")
+    assertEquals(10, allDf.count())
+    val allRow = allDf.filter("id = 'key_3'").collect()(0)
+    assertEquals("label_3", allRow.getString(2))
+    assertEquals(30, allRow.getInt(3))
+    val allEmbedding = allRow.getSeq[Float](1)
+    assertEquals(16, allEmbedding.size)
+    assertTrue(allEmbedding.forall(_ == 3.0f))
+  }
+
+  @Test
+  def testDoubleVectorRoundTrip(): Unit = {
+    val metadata = vectorMetadata("VECTOR(64, DOUBLE)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(DoubleType, containsNull = false),
+        nullable = false, metadata),
+      StructField("label", StringType, nullable = true)
+    ))
+
+    val random = new scala.util.Random(123)
+    val data = (0 until 50).map { i =>
+      val embedding = Array.fill(64)(random.nextDouble())
+      Row(s"key_$i", embedding.toSeq, s"label_$i")
+    }
+
+    val df = createVectorDf(schema, data)
+
+    writeHudiTable(df, "double_vector_test", basePath + "/double_vec")
+
+    val readDf = readHudiTable(basePath + "/double_vec")
+    assertEquals(50, readDf.count())
+
+    // Verify schema: ArrayType(DoubleType)
+    val embField = readDf.schema("embedding")
+    val arrType = embField.dataType.asInstanceOf[ArrayType]
+    assertEquals(DoubleType, arrType.elementType)
+
+    // Verify metadata preserved with DOUBLE element type
+    val readMeta = embField.metadata
+    assertTrue(readMeta.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    val parsed = HoodieSchema.parseTypeDescriptor(
+      readMeta.getString(HoodieSchema.TYPE_METADATA_FIELD))
+    assertEquals(HoodieSchemaType.VECTOR, parsed.getType)
+    val vecSchema = parsed.asInstanceOf[HoodieSchema.Vector]
+    assertEquals(64, vecSchema.getDimension)
+    assertEquals(HoodieSchema.Vector.VectorElementType.DOUBLE, 
vecSchema.getVectorElementType)
+
+    // Verify actual values
+    val origMap = df.select("id", "embedding").collect()
+      .map(r => (r.getString(0), r.getSeq[Double](1))).toMap
+    val readMap = readDf.select("id", "embedding").collect()
+      .map(r => (r.getString(0), r.getSeq[Double](1))).toMap
+
+    origMap.foreach { case (id, orig) =>
+      val read = readMap(id)
+      assertEquals(64, read.size, s"Dimension mismatch for $id")
+      orig.zip(read).zipWithIndex.foreach { case ((o, r), idx) =>
+        assertEquals(o, r, 1e-15, s"Double mismatch at $id[$idx]")
+      }
+    }
+  }
+
+  @Test
+  def testInt8VectorRoundTrip(): Unit = {
+    val metadata = vectorMetadata("VECTOR(256, INT8)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(ByteType, containsNull = false),
+        nullable = false, metadata)
+    ))
+
+    val random = new scala.util.Random(99)
+    val data = (0 until 30).map { i =>
+      val embedding = Array.fill(256)((random.nextInt(256) - 128).toByte)
+      Row(s"key_$i", embedding.toSeq)
+    }
+
+    val df = createVectorDf(schema, data)
+
+    writeHudiTable(df, "int8_vector_test", basePath + "/int8_vec")
+
+    val readDf = readHudiTable(basePath + "/int8_vec")
+    assertEquals(30, readDf.count())
+
+    // Verify schema: ArrayType(ByteType)
+    val embField = readDf.schema("embedding")
+    val arrType = embField.dataType.asInstanceOf[ArrayType]
+    assertEquals(ByteType, arrType.elementType)
+
+    // Verify metadata
+    val readMeta = embField.metadata
+    val parsed = HoodieSchema.parseTypeDescriptor(
+      readMeta.getString(HoodieSchema.TYPE_METADATA_FIELD))
+    assertEquals(HoodieSchemaType.VECTOR, parsed.getType)
+    val vecSchema = parsed.asInstanceOf[HoodieSchema.Vector]
+    assertEquals(256, vecSchema.getDimension)
+    assertEquals(HoodieSchema.Vector.VectorElementType.INT8, 
vecSchema.getVectorElementType)
+
+    // Verify byte values
+    val origMap = df.select("id", "embedding").collect()
+      .map(r => (r.getString(0), r.getSeq[Byte](1))).toMap
+    val readMap = readDf.select("id", "embedding").collect()
+      .map(r => (r.getString(0), r.getSeq[Byte](1))).toMap
+
+    origMap.foreach { case (id, orig) =>
+      val read = readMap(id)
+      assertEquals(256, read.size, s"Dimension mismatch for $id")
+      assertArrayEquals(orig.toArray, read.toArray, s"INT8 vector mismatch for 
$id")
+    }
+  }
+
+  @Test
+  def testMultipleVectorColumns(): Unit = {
+    val floatMeta = vectorMetadata("VECTOR(8)")
+    val doubleMeta = vectorMetadata("VECTOR(4, DOUBLE)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("vec_float", ArrayType(FloatType, containsNull = false),
+        nullable = false, floatMeta),
+      StructField("label", StringType, nullable = true),
+      StructField("vec_double", ArrayType(DoubleType, containsNull = false),
+        nullable = true, doubleMeta)
+    ))
+
+    val data = (0 until 20).map { i =>
+      Row(
+        s"key_$i",
+        Array.fill(8)(i.toFloat).toSeq,
+        s"label_$i",
+        if (i % 3 == 0) null else Array.fill(4)(i.toDouble).toSeq
+      )
+    }
+
+    val df = createVectorDf(schema, data)
+
+    writeHudiTable(df, "multi_vector_test", basePath + "/multi_vec")
+
+    val readDf = readHudiTable(basePath + "/multi_vec")
+    assertEquals(20, readDf.count())
+
+    // Verify both vector columns present with correct types
+    val floatField = readDf.schema("vec_float")
+    assertEquals(FloatType, 
floatField.dataType.asInstanceOf[ArrayType].elementType)
+    val doubleField = readDf.schema("vec_double")
+    assertEquals(DoubleType, 
doubleField.dataType.asInstanceOf[ArrayType].elementType)
+
+    // Verify data: row with both vectors
+    val row5 = readDf.select("id", "vec_float", "vec_double")
+      .filter("id = 'key_5'").collect()(0)
+    val fVec = row5.getSeq[Float](1)
+    assertEquals(8, fVec.size)
+    assertTrue(fVec.forall(_ == 5.0f))
+    val dVec = row5.getSeq[Double](2)
+    assertEquals(4, dVec.size)
+    assertTrue(dVec.forall(_ == 5.0))
+
+    // Verify data: row with null double vector (i=0, i%3==0)
+    val row0 = readDf.select("id", "vec_float", "vec_double")
+      .filter("id = 'key_0'").collect()(0)
+    assertFalse(row0.isNullAt(1))
+    assertTrue(row0.isNullAt(2), "Expected null double vector for key_0")
+
+    // Verify projection: select only one vector column
+    val floatOnlyDf = readDf.select("id", "vec_float")
+    assertEquals(20, floatOnlyDf.count())
+    val doubleOnlyDf = readDf.select("id", "vec_double")
+    assertEquals(20, doubleOnlyDf.count())
+  }
+
+  @Test
+  def testMorTableWithVectors(): Unit = {
+    val metadata = vectorMetadata("VECTOR(16)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata),
+      StructField("ts", LongType, nullable = false)
+    ))
+
+    // Initial insert
+    val data1 = (0 until 20).map { i =>
+      Row(s"key_$i", Array.fill(16)(1.0f).toSeq, i.toLong)
+    }
+
+    val df1 = createVectorDf(schema, data1)
+
+    writeHudiTable(df1, "mor_vector_test", basePath + "/mor_vec",
+      tableType = "MERGE_ON_READ", precombineField = "ts")
+
+    // Upsert: update some vectors with new values
+    val data2 = (0 until 10).map { i =>
+      Row(s"key_$i", Array.fill(16)(2.0f).toSeq, 100L + i)
+    }
+
+    val df2 = createVectorDf(schema, data2)
+
+    writeHudiTable(df2, "mor_vector_test", basePath + "/mor_vec",
+      tableType = "MERGE_ON_READ", precombineField = "ts", mode = 
SaveMode.Append)
+
+    // Read the merged view
+    val readDf = readHudiTable(basePath + "/mor_vec")
+    assertEquals(20, readDf.count())
+
+    // Updated rows (key_0 through key_9) should have new vectors
+    val updatedRow = readDf.select("id", "embedding")
+      .filter("id = 'key_5'").collect()(0)
+    val updatedVec = updatedRow.getSeq[Float](1)
+    assertEquals(16, updatedVec.size)
+    assertTrue(updatedVec.forall(_ == 2.0f), "Updated vector should have value 
2.0")
+
+    // Non-updated rows (key_10 through key_19) should keep original vectors
+    val origRow = readDf.select("id", "embedding")
+      .filter("id = 'key_15'").collect()(0)
+    val origVec = origRow.getSeq[Float](1)
+    assertEquals(16, origVec.size)
+    assertTrue(origVec.forall(_ == 1.0f), "Non-updated vector should have 
value 1.0")
+  }
+
+  @Test
+  def testCowUpsertWithVectors(): Unit = {
+    val metadata = vectorMetadata("VECTOR(8)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata),
+      StructField("ts", LongType, nullable = false),
+      StructField("name", StringType, nullable = true)
+    ))
+
+    // Initial write
+    val data1 = (0 until 10).map { i =>
+      Row(s"key_$i", Array.fill(8)(0.0f).toSeq, i.toLong, s"name_$i")
+    }
+
+    writeHudiTable(createVectorDf(schema, data1), "cow_upsert_vec_test",
+      basePath + "/cow_upsert", precombineField = "ts")
+
+    // Upsert: update vectors for existing keys + add new keys
+    val data2 = Seq(
+      Row("key_0", Array.fill(8)(9.9f).toSeq, 100L, "updated_0"),
+      Row("key_5", Array.fill(8)(5.5f).toSeq, 100L, "updated_5"),
+      Row("key_10", Array.fill(8)(10.0f).toSeq, 100L, "new_10")
+    )
+
+    writeHudiTable(createVectorDf(schema, data2), "cow_upsert_vec_test",
+      basePath + "/cow_upsert", precombineField = "ts", mode = SaveMode.Append)
+
+    val readDf = readHudiTable(basePath + "/cow_upsert")
+    assertEquals(11, readDf.count())
+
+    // Verify updated key_0
+    val r0 = readDf.select("id", "embedding", "name")
+      .filter("id = 'key_0'").collect()(0)
+    assertTrue(r0.getSeq[Float](1).forall(_ == 9.9f))
+    assertEquals("updated_0", r0.getString(2))
+
+    // Verify non-updated key_3
+    val r3 = readDf.select("id", "embedding", "name")
+      .filter("id = 'key_3'").collect()(0)
+    assertTrue(r3.getSeq[Float](1).forall(_ == 0.0f))
+    assertEquals("name_3", r3.getString(2))
+
+    // Verify new key_10
+    val r10 = readDf.select("id", "embedding", "name")
+      .filter("id = 'key_10'").collect()(0)
+    assertTrue(r10.getSeq[Float](1).forall(_ == 10.0f))
+    assertEquals("new_10", r10.getString(2))
+  }
+
+  @Test
+  def testLargeDimensionVector(): Unit = {
+    val metadata = vectorMetadata("VECTOR(1536)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata)
+    ))
+
+    val random = new scala.util.Random(7)
+    val data = (0 until 5).map { i =>
+      Row(s"key_$i", Array.fill(1536)(random.nextFloat()).toSeq)
+    }
+
+    val df = createVectorDf(schema, data)
+
+    writeHudiTable(df, "large_dim_vec_test", basePath + "/large_dim")
+
+    val readDf = readHudiTable(basePath + "/large_dim")
+    assertEquals(5, readDf.count())
+
+    // Verify dimension preserved
+    val readMeta = readDf.schema("embedding").metadata
+    val vecSchema = HoodieSchema.parseTypeDescriptor(
+      
readMeta.getString(HoodieSchema.TYPE_METADATA_FIELD)).asInstanceOf[HoodieSchema.Vector]
+    assertEquals(1536, vecSchema.getDimension)
+
+    // Verify values
+    val origMap = df.select("id", "embedding").collect()
+      .map(r => (r.getString(0), r.getSeq[Float](1))).toMap
+    val readMap = readDf.select("id", "embedding").collect()
+      .map(r => (r.getString(0), r.getSeq[Float](1))).toMap
+
+    origMap.foreach { case (id, orig) =>
+      val read = readMap(id)
+      assertEquals(1536, read.size)
+      orig.zip(read).foreach { case (o, r) =>
+        assertEquals(o, r, 1e-9f, s"Mismatch in $id")
+      }
+    }
+  }
+
+  @Test
+  def testSmallDimensionVector(): Unit = {
+    val metadata = vectorMetadata("VECTOR(2)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("coords", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata)
+    ))
+
+    val data = Seq(
+      Row("a", Seq(1.0f, 2.0f)),
+      Row("b", Seq(-1.5f, 3.14f)),
+      Row("c", Seq(0.0f, Float.MaxValue))
+    )
+
+    val df = createVectorDf(schema, data)
+
+    writeHudiTable(df, "small_dim_test", basePath + "/small_dim")
+
+    val readDf = readHudiTable(basePath + "/small_dim")
+    assertEquals(3, readDf.count())
+
+    val rowA = readDf.select("id", "coords").filter("id = 'a'").collect()(0)
+    val coordsA = rowA.getSeq[Float](1)
+    assertEquals(2, coordsA.size)
+    assertEquals(1.0f, coordsA(0), 1e-9f)
+    assertEquals(2.0f, coordsA(1), 1e-9f)
+
+    val rowC = readDf.select("id", "coords").filter("id = 'c'").collect()(0)
+    val coordsC = rowC.getSeq[Float](1)
+    assertEquals(Float.MaxValue, coordsC(1), 1e-30f)
+  }
+
+  @Test
+  def testVectorWithNonVectorArrayColumn(): Unit = {
+    val vectorMeta = vectorMetadata("VECTOR(4)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, vectorMeta),
+      StructField("tags", ArrayType(StringType, containsNull = true),
+        nullable = true)
+    ))
+
+    val data = Seq(
+      Row("k1", Seq(1.0f, 2.0f, 3.0f, 4.0f), Seq("tag1", "tag2")),
+      Row("k2", Seq(5.0f, 6.0f, 7.0f, 8.0f), null),
+      Row("k3", Seq(0.1f, 0.2f, 0.3f, 0.4f), Seq("tag3"))
+    )
+
+    val df = createVectorDf(schema, data)
+
+    writeHudiTable(df, "mixed_array_test", basePath + "/mixed_array")
+
+    val readDf = readHudiTable(basePath + "/mixed_array")
+    assertEquals(3, readDf.count())
+
+    // Vector column should be ArrayType(FloatType) with vector metadata
+    val embField = readDf.schema("embedding")
+    assertTrue(embField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    assertEquals(FloatType, 
embField.dataType.asInstanceOf[ArrayType].elementType)
+
+    // Non-vector array column should be ArrayType(StringType) without vector 
metadata
+    val tagsField = readDf.schema("tags")
+    assertFalse(tagsField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+    assertEquals(StringType, 
tagsField.dataType.asInstanceOf[ArrayType].elementType)
+
+    // Verify vector data preserved
+    val row1 = readDf.select("id", "embedding", "tags")
+      .filter("id = 'k1'").collect()(0)
+    val emb = row1.getSeq[Float](1)
+    assertEquals(Seq(1.0f, 2.0f, 3.0f, 4.0f), emb)
+    assertEquals(Seq("tag1", "tag2"), row1.getSeq[String](2))
+
+    // Verify null tags preserved
+    val row2 = readDf.select("id", "embedding", "tags")
+      .filter("id = 'k2'").collect()(0)
+    assertFalse(row2.isNullAt(1))
+    assertTrue(row2.isNullAt(2))
+  }
+
+  @Test
+  def testMorWithMultipleUpserts(): Unit = {
+    val metadata = vectorMetadata("VECTOR(4, DOUBLE)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(DoubleType, containsNull = false),
+        nullable = false, metadata),
+      StructField("ts", LongType, nullable = false)
+    ))
+
+    // Insert batch 1
+    val batch1 = (0 until 10).map { i =>
+      Row(s"key_$i", Array.fill(4)(1.0).toSeq, 1L)
+    }
+    writeHudiTable(createVectorDf(schema, batch1), "mor_multi_upsert_test",
+      basePath + "/mor_multi", tableType = "MERGE_ON_READ", precombineField = 
"ts")
+
+    // Upsert batch 2: update key_0..key_4
+    val batch2 = (0 until 5).map { i =>
+      Row(s"key_$i", Array.fill(4)(2.0).toSeq, 2L)
+    }
+    writeHudiTable(createVectorDf(schema, batch2), "mor_multi_upsert_test",
+      basePath + "/mor_multi", tableType = "MERGE_ON_READ", precombineField = 
"ts",
+      mode = SaveMode.Append)
+
+    // Upsert batch 3: update key_0..key_2 again
+    val batch3 = (0 until 3).map { i =>
+      Row(s"key_$i", Array.fill(4)(3.0).toSeq, 3L)
+    }
+    writeHudiTable(createVectorDf(schema, batch3), "mor_multi_upsert_test",
+      basePath + "/mor_multi", tableType = "MERGE_ON_READ", precombineField = 
"ts",
+      mode = SaveMode.Append)
+
+    val readDf = readHudiTable(basePath + "/mor_multi")
+    assertEquals(10, readDf.count())
+
+    // key_0: updated 3 times → should have value 3.0
+    val r0 = readDf.select("id", "embedding").filter("id = 
'key_0'").collect()(0)
+    assertTrue(r0.getSeq[Double](1).forall(_ == 3.0), "key_0 should have 
latest value 3.0")
+
+    // key_3: updated once (batch 2) → should have value 2.0
+    val r3 = readDf.select("id", "embedding").filter("id = 
'key_3'").collect()(0)
+    assertTrue(r3.getSeq[Double](1).forall(_ == 2.0), "key_3 should have value 
2.0")
+
+    // key_7: never updated → should have value 1.0
+    val r7 = readDf.select("id", "embedding").filter("id = 
'key_7'").collect()(0)
+    assertTrue(r7.getSeq[Double](1).forall(_ == 1.0), "key_7 should have 
original value 1.0")
+  }
+
+  @Test
+  def testDimensionMismatchOnWrite(): Unit = {
+    // Schema declares VECTOR(8) but data has arrays of length 4
+    val metadata = vectorMetadata("VECTOR(8)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata)
+    ))
+
+    val data = Seq(
+      Row("key_1", Seq(1.0f, 2.0f, 3.0f, 4.0f)) // only 4 elements, schema 
says 8
+    )
+
+    val df = createVectorDf(schema, data)
+
+    val ex = assertThrows(classOf[Exception], () => {
+      writeHudiTable(df, "dim_mismatch_test", basePath + "/dim_mismatch")
+    })
+    // The root cause should mention dimension mismatch
+    var cause: Throwable = ex
+    var foundMismatch = false
+    while (cause != null && !foundMismatch) {
+      if (cause.getMessage != null && cause.getMessage.contains("dimension 
mismatch")) {
+        foundMismatch = true
+      }
+      cause = cause.getCause
+    }
+    assertTrue(foundMismatch,
+      s"Expected 'dimension mismatch' in exception chain, got: 
${ex.getMessage}")
+  }
+
+  @Test
+  def testSchemaEvolutionRejectsDimensionChange(): Unit = {
+    // Write initial table with VECTOR(4)
+    val metadata4 = vectorMetadata("VECTOR(4)")
+
+    val schema4 = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata4),
+      StructField("ts", LongType, nullable = false)
+    ))
+
+    val data1 = Seq(Row("key_1", Seq(1.0f, 2.0f, 3.0f, 4.0f), 1L))
+    writeHudiTable(createVectorDf(schema4, data1), "schema_evolve_dim_test",
+      basePath + "/schema_evolve_dim", precombineField = "ts")
+
+    // Now try to write with VECTOR(8) — different dimension should be rejected
+    val metadata8 = vectorMetadata("VECTOR(8)")
+
+    val schema8 = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata8),
+      StructField("ts", LongType, nullable = false)
+    ))
+
+    val data2 = Seq(Row("key_2", Seq(1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 
8.0f), 2L))
+
+    assertThrows(classOf[Exception], () => {
+      writeHudiTable(createVectorDf(schema8, data2), "schema_evolve_dim_test",
+        basePath + "/schema_evolve_dim", precombineField = "ts", mode = 
SaveMode.Append)
+    })
+  }
+
+  /**
+   * Verifies that vector column metadata is written to the Parquet file footer
+   * under the key hoodie.vector.columns.
+   */
+  @Test
+  def testParquetFooterContainsVectorMetadata(): Unit = {
+    val metadata = vectorMetadata("VECTOR(8)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata)
+    ))
+
+    val data = Seq(Row("key_1", Array.fill(8)(1.0f).toSeq))
+    writeHudiTable(createVectorDf(schema, data), "footer_meta_test", basePath 
+ "/footer_meta")
+
+    // Find a .parquet base file and read its footer metadata
+    val conf = spark.sessionState.newHadoopConf()
+    val fs = new Path(basePath + "/footer_meta").getFileSystem(conf)
+    val parquetFiles = fs.listStatus(new Path(basePath + "/footer_meta"))
+      .flatMap(d => Option(fs.listStatus(d.getPath)).getOrElse(Array.empty))
+      .filter(f => f.getPath.getName.endsWith(".parquet") && 
!f.getPath.getName.startsWith("."))
+
+    assertTrue(parquetFiles.nonEmpty, "Expected at least one parquet file")
+
+    val reader = 
ParquetFileReader.open(HadoopInputFile.fromPath(parquetFiles.head.getPath, 
conf))
+    try {
+      val footerMeta = reader.getFileMetaData.getKeyValueMetaData.asScala
+      
assertTrue(footerMeta.contains(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY),
+        s"Footer should contain 
${HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY}, got keys: 
${footerMeta.keys.mkString(", ")}")
+
+      val value = footerMeta(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY)
+      assertTrue(value.contains("embedding"), s"Footer value should reference 
'embedding' column, got: $value")
+      assertTrue(value.contains("VECTOR"), s"Footer value should contain 
'VECTOR' descriptor, got: $value")
+    } finally {
+      reader.close()
+    }
+  }
+
+  @Test
+  def testPartitionedTableWithVector(): Unit = {
+    val metadata = vectorMetadata("VECTOR(4)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata),
+      StructField("label", StringType, nullable = true),
+      StructField("category", StringType, nullable = false)
+    ))
+
+    // Two partitions: "catA" and "catB"
+    val data = (0 until 10).map { i =>
+      val category = if (i % 2 == 0) "catA" else "catB"
+      Row(s"key_$i", Array.fill(4)(i.toFloat).toSeq, s"label_$i", category)
+    }
+
+    writeHudiTable(createVectorDf(schema, data), "partitioned_vector_test",
+      basePath + "/partitioned",
+      extraOpts = Map("hoodie.datasource.write.partitionpath.field" -> 
"category"))
+
+    val readDf = readHudiTable(basePath + "/partitioned")
+    assertEquals(10, readDf.count())
+
+    // Collect all rows and verify each row's vector matches its key
+    val rowMap = readDf.select("id", "embedding", "category").collect()
+      .map(r => r.getString(0) -> (r.getSeq[Float](1), r.getString(2)))
+      .toMap
+
+    for (i <- 0 until 10) {
+      val (vec, cat) = rowMap(s"key_$i")
+      val expectedCat = if (i % 2 == 0) "catA" else "catB"
+      assertEquals(4, vec.size, s"key_$i dimension wrong")
+      assertTrue(vec.forall(_ == i.toFloat),
+        s"key_$i: expected ${i.toFloat} but got ${vec.head} (ordinal 
mismatch?)")
+      assertEquals(expectedCat, cat, s"key_$i partition value wrong")
+    }
+
+    // Also verify projection of vector-only across partitions
+    val vecOnly = readDf.select("id", "embedding").collect()
+      .map(r => r.getString(0) -> r.getSeq[Float](1)).toMap
+    for (i <- 0 until 10) {
+      assertTrue(vecOnly(s"key_$i").forall(_ == i.toFloat),
+        s"key_$i projected vector wrong")
+    }
+  }
+
+  @Test
+  def testVectorAsLastColumn(): Unit = {
+    val metadata = vectorMetadata("VECTOR(4)")
+
+    // Vector is at position 4 (last), after several non-vector columns
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("col_a", IntegerType, nullable = true),
+      StructField("col_b", StringType, nullable = true),
+      StructField("col_c", DoubleType, nullable = true),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata)
+    ))
+
+    val data = (0 until 10).map { i =>
+      Row(s"key_$i", i, s"str_$i", i.toDouble * 1.5, 
Array.fill(4)(i.toFloat).toSeq)
+    }
+
+    writeHudiTable(createVectorDf(schema, data), "last_col_vector_test", 
basePath + "/last_col")
+
+    val readDf = readHudiTable(basePath + "/last_col")
+    assertEquals(10, readDf.count())
+
+    // Read all columns: verify vector and non-vector columns correct
+    val allRows = readDf.select("id", "col_a", "col_b", "embedding").collect()
+      .map(r => r.getString(0) -> r).toMap
+
+    for (i <- 0 until 10) {
+      val row = allRows(s"key_$i")
+      assertEquals(i, row.getInt(1), s"col_a wrong for key_$i")
+      assertEquals(s"str_$i", row.getString(2), s"col_b wrong for key_$i")
+      val vec = row.getSeq[Float](3)
+      assertEquals(4, vec.size, s"key_$i dimension wrong")
+      assertTrue(vec.forall(_ == i.toFloat),
+        s"key_$i vector wrong (ordinal mismatch?): expected ${i.toFloat}, got 
${vec.head}")
+    }
+
+    // Project only the vector column (ordinal shifts to 0 in projected schema)
+    val embOnly = readDf.select("id", "embedding").collect()
+      .map(r => r.getString(0) -> r.getSeq[Float](1)).toMap
+    for (i <- 0 until 10) {
+      assertTrue(embOnly(s"key_$i").forall(_ == i.toFloat),
+        s"key_$i projected-only vector wrong")
+    }
+  }
+
+  /**
+   * Schema evolution: adding a new non-vector column to a table that already 
has a vector column
+   * should succeed. Old rows get null for the new column; vector data must be 
intact in all rows.
+   */
+  @Test
+  def testSchemaEvolutionAddColumnToVectorTable(): Unit = {
+    val metadata = vectorMetadata("VECTOR(4)")
+
+    val schemaV1 = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata),
+      StructField("ts", LongType, nullable = false)
+    ))
+
+    val data1 = (0 until 5).map { i =>
+      Row(s"key_$i", Array.fill(4)(i.toFloat).toSeq, i.toLong)
+    }
+    writeHudiTable(createVectorDf(schemaV1, data1), 
"schema_evolve_add_col_test",
+      basePath + "/schema_evolve_add", precombineField = "ts",
+      extraOpts = Map("hoodie.schema.on.read.enable" -> "true"))
+
+    // V2: add a new non-vector column
+    val schemaV2 = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata),
+      StructField("ts", LongType, nullable = false),
+      StructField("new_col", StringType, nullable = true)
+    ))
+
+    val data2 = (5 until 10).map { i =>
+      Row(s"key_$i", Array.fill(4)(i.toFloat).toSeq, i.toLong, s"v2_$i")
+    }
+    writeHudiTable(createVectorDf(schemaV2, data2), 
"schema_evolve_add_col_test",
+      basePath + "/schema_evolve_add", precombineField = "ts", mode = 
SaveMode.Append,
+      extraOpts = Map("hoodie.schema.on.read.enable" -> "true"))
+
+    val readDf = spark.read.format("hudi")
+      .option("hoodie.schema.on.read.enable", "true")
+      .load(basePath + "/schema_evolve_add")
+    assertEquals(10, readDf.count())
+
+    val rowMap = readDf.select("id", "embedding", "new_col").collect()
+      .map(r => r.getString(0) -> r).toMap
+
+    // Old rows (key_0..key_4): vector intact, new_col is null
+    for (i <- 0 until 5) {
+      val row = rowMap(s"key_$i")
+      val vec = row.getSeq[Float](1)
+      assertEquals(4, vec.size)
+      assertTrue(vec.forall(_ == i.toFloat), s"key_$i vector corrupted after 
schema evolution")
+      assertTrue(row.isNullAt(2), s"key_$i new_col should be null")
+    }
+    // New rows (key_5..key_9): vector intact, new_col has value
+    for (i <- 5 until 10) {
+      val row = rowMap(s"key_$i")
+      val vec = row.getSeq[Float](1)
+      assertEquals(4, vec.size)
+      assertTrue(vec.forall(_ == i.toFloat), s"key_$i vector corrupted after 
schema evolution")
+      assertEquals(s"v2_$i", row.getString(2), s"key_$i new_col wrong")
+    }
+  }
+
+  /**
+   * Deleting records from a table with a vector column should not affect 
remaining records.
+   */
+  @Test
+  def testDeleteFromVectorTable(): Unit = {
+    val metadata = vectorMetadata("VECTOR(4)")
+
+    val schema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("embedding", ArrayType(FloatType, containsNull = false),
+        nullable = false, metadata),
+      StructField("ts", LongType, nullable = false)
+    ))
+
+    val data = (0 until 10).map { i =>
+      Row(s"key_$i", Array.fill(4)(i.toFloat).toSeq, i.toLong)
+    }
+    writeHudiTable(createVectorDf(schema, data), "delete_vector_test",
+      basePath + "/delete_vec", precombineField = "ts")
+
+    // Delete key_2, key_5, key_8
+    val deletedKeys = Set("key_2", "key_5", "key_8")
+    val deleteSchema = StructType(Seq(
+      StructField("id", StringType, nullable = false),
+      StructField("ts", LongType, nullable = false)
+    ))
+    val deleteData = deletedKeys.toSeq.map(k => Row(k, 999L))
+    writeHudiTable(createVectorDf(deleteSchema, deleteData), 
"delete_vector_test",
+      basePath + "/delete_vec", precombineField = "ts", mode = SaveMode.Append,
+      extraOpts = Map(OPERATION.key -> "delete"))
+
+    val readDf = readHudiTable(basePath + "/delete_vec")
+    assertEquals(7, readDf.count(), "Deleted rows should be gone")
+
+    val rowMap = readDf.select("id", "embedding").collect()
+      .map(r => r.getString(0) -> r.getSeq[Float](1)).toMap
+
+    // Deleted keys must not appear
+    deletedKeys.foreach { k =>
+      assertFalse(rowMap.contains(k), s"$k should have been deleted")
+    }
+
+    // Remaining keys must have correct vectors
+    val remaining = (0 until 10).map(i => 
s"key_$i").filterNot(deletedKeys.contains)
+    remaining.foreach { k =>
+      val i = k.stripPrefix("key_").toInt
+      val vec = rowMap(k)
+      assertEquals(4, vec.size, s"$k dimension wrong")
+      assertTrue(vec.forall(_ == i.toFloat), s"$k vector wrong after delete")
+    }
+  }
+
+  private def assertArrayEquals(expected: Array[Byte], actual: Array[Byte], 
message: String): Unit = {
+    assertEquals(expected.length, actual.length, s"$message: length mismatch")
+    expected.zip(actual).zipWithIndex.foreach { case ((e, a), idx) =>
+      assertEquals(e, a, s"$message: mismatch at index $idx")
+    }
+  }
+}

Reply via email to