voonhous commented on code in PR #18328:
URL: https://github.com/apache/hudi/pull/18328#discussion_r2993530241
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -290,4 +307,50 @@ object SparkFileFormatInternalRowReaderContext {
field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
}
+ /**
+ * Detects VECTOR columns from HoodieSchema.
+ * Delegates to [[VectorConversionUtils.detectVectorColumns]].
+ * @return Map of ordinal to Vector schema for VECTOR fields.
+ */
+ private[hudi] def detectVectorColumns(schema: HoodieSchema): Map[Int,
HoodieSchema.Vector] = {
+ VectorConversionUtils.detectVectorColumns(schema).asScala.map { case (k,
v) => (k.intValue(), v) }.toMap
+ }
+
+ /**
+ * Detects VECTOR columns from Spark StructType metadata.
+ * Delegates to [[VectorConversionUtils.detectVectorColumnsFromMetadata]].
+ * @return Map of ordinal to Vector schema for VECTOR fields.
+ */
+ def detectVectorColumnsFromMetadata(schema: StructType): Map[Int,
HoodieSchema.Vector] = {
+ VectorConversionUtils.detectVectorColumnsFromMetadata(schema).asScala.map
{ case (k, v) => (k.intValue(), v) }.toMap
+ }
+
+ /**
+ * Replaces ArrayType with BinaryType for VECTOR columns so the Parquet
reader
+ * can read FIXED_LEN_BYTE_ARRAY data without type mismatch.
+ * Delegates to [[VectorConversionUtils.replaceVectorColumnsWithBinary]].
+ */
+ def replaceVectorColumnsWithBinary(structType: StructType, vectorColumns:
Map[Int, HoodieSchema.Vector]): StructType = {
+ val javaMap = vectorColumns.map { case (k, v) => (Integer.valueOf(k),
v.asInstanceOf[AnyRef]) }.asJava
+ VectorConversionUtils.replaceVectorColumnsWithBinary(structType, javaMap)
+ }
+
+ /**
+ * Wraps an iterator to convert binary VECTOR columns back to typed arrays.
+ * Unpacks bytes from FIXED_LEN_BYTE_ARRAY into GenericArrayData using the
canonical vector byte order.
+ */
+ private[hudi] def wrapWithVectorConversion(
+ iterator: ClosableIterator[InternalRow],
+ vectorColumns: Map[Int, HoodieSchema.Vector],
+ readSchema: StructType): ClosableIterator[InternalRow] = {
+ val javaVectorCols: java.util.Map[Integer, HoodieSchema.Vector] =
+ vectorColumns.map { case (k, v) => (Integer.valueOf(k), v) }.asJava
+ val mapper = VectorConversionUtils.buildRowMapper(readSchema,
javaVectorCols, row => row)
Review Comment:
This looks unsafe. The callback is an identity here, i.e. returning row as
the converted. `buildRowMapper` reuses a single GenericInternalRow buffer
across all calls. If a downstream operator retains a reference to a previous
row, it would silently see overwritten data and cause SDC/correctness issues.
There are other call sites, e.g. HoodieSparkParquetReader.java that are
doing this:
```java
VectorConversionUtils.buildRowMapper(readStructSchema, vectorColumnInfo,
vectorProjection::apply);
```
Maybe we can do something similar and add:
```scala
val projection = UnsafeProjection.create(readSchema)
val mapper = VectorConversionUtils.buildRowMapper(readSchema,
javaVectorCols, projection.apply(_))
```
##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -248,8 +248,20 @@ private Type convertField(String fieldName, HoodieSchema
schema, Type.Repetition
builder = Types.primitive(FIXED_LEN_BYTE_ARRAY,
repetition).length(schema.getFixedSize());
}
break;
+ case VECTOR:
+ // Vectors are stored as bare FIXED_LEN_BYTE_ARRAY without a Parquet
logical type annotation.
+ // Vector semantics (dimension, element type) are resolved from
HoodieSchema (the table's
+ // stored schema), not from the Parquet file schema. The reverse
direction
+ // (FIXED_LEN_BYTE_ARRAY → HoodieSchema) currently maps to generic
FIXED; this is
+ // acceptable because the read path detects vectors from the
HoodieSchema, not from Parquet.
+ // TODO: Consider adding VectorLogicalTypeAnnotation for fully
self-describing Parquet files.
+ HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) schema;
+ int fixedSize = vectorSchema.getDimension()
+ * vectorSchema.getVectorElementType().getElementSize();
+ builder = Types.primitive(FIXED_LEN_BYTE_ARRAY,
repetition).length(fixedSize);
+ break;
case UNION:
- return convertUnion(fieldName, schema, repetition, schemaPath);
+ return convertUnion(fieldName, schema, repetition, schemaPath);
Review Comment:
NIT: Don't think this is a required hcange.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/VectorConversionUtils.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+/**
+ * Shared utility methods for vector column handling during Parquet read/write.
+ *
+ * Vectors are stored as Parquet FIXED_LEN_BYTE_ARRAY columns. On read, Spark
maps these
+ * to BinaryType. This class provides the canonical conversion between the
binary
+ * representation and Spark's typed ArrayData (float[], double[], byte[]).
+ *
+ */
+public final class VectorConversionUtils {
+
+ private VectorConversionUtils() {
+ }
+
+ /**
+ * Detects VECTOR columns in a HoodieSchema record and returns a map of
field ordinal
+ * to the corresponding {@link HoodieSchema.Vector} schema.
+ *
+ * @param schema a HoodieSchema of type RECORD (or null)
+ * @return map from field index to Vector schema; empty map if schema is
null or has no vectors
+ */
+ public static Map<Integer, HoodieSchema.Vector>
detectVectorColumns(HoodieSchema schema) {
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
+ if (schema == null) {
+ return vectorColumnInfo;
+ }
+ List<HoodieSchemaField> fields = schema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchema fieldSchema = fields.get(i).schema().getNonNullType();
+ if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+ vectorColumnInfo.put(i, (HoodieSchema.Vector) fieldSchema);
+ }
+ }
+ return vectorColumnInfo;
+ }
+
+ /**
+ * Detects VECTOR columns from Spark StructType metadata annotations.
+ * Fields with metadata key {@link HoodieSchema#TYPE_METADATA_FIELD}
starting with "VECTOR"
+ * are parsed and included.
+ *
+ * @param schema Spark StructType
+ * @return map from field index to Vector schema; empty map if no vectors
found
+ */
+ public static Map<Integer, HoodieSchema.Vector>
detectVectorColumnsFromMetadata(StructType schema) {
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new HashMap<>();
+ if (schema == null) {
+ return vectorColumnInfo;
+ }
+ StructField[] fields = schema.fields();
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ if (field.metadata().contains(HoodieSchema.TYPE_METADATA_FIELD)) {
+ String typeStr =
field.metadata().getString(HoodieSchema.TYPE_METADATA_FIELD);
+ if (typeStr.startsWith("VECTOR")) {
+ HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeStr);
+ if (parsed.getType() == HoodieSchemaType.VECTOR) {
+ vectorColumnInfo.put(i, (HoodieSchema.Vector) parsed);
+ }
+ }
+ }
+ }
+ return vectorColumnInfo;
+ }
+
+ /**
+ * Replaces ArrayType with BinaryType for VECTOR columns so the Parquet
reader
+ * can read FIXED_LEN_BYTE_ARRAY data without type mismatch.
+ *
+ * @param structType the original Spark schema
+ * @param vectorColumns map of ordinal to vector info (only the key set is
used)
+ * @return a new StructType with vector columns replaced by BinaryType
+ */
+ public static StructType replaceVectorColumnsWithBinary(StructType
structType, Map<Integer, ?> vectorColumns) {
+ StructField[] fields = structType.fields();
+ StructField[] newFields = new StructField[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ if (vectorColumns.containsKey(i)) {
+ // Preserve the original field metadata (including hudi_type) so that
downstream code
+ // calling detectVectorColumnsFromMetadata on the modified schema
still finds vectors.
+ newFields[i] = new StructField(fields[i].name(), BinaryType$.MODULE$,
fields[i].nullable(), fields[i].metadata());
+ } else {
+ newFields[i] = fields[i];
+ }
+ }
+ return new StructType(newFields);
+ }
+
+ /**
+ * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to
a typed array
+ * based on the vector's element type and dimension.
+ *
+ * @param bytes raw bytes read from Parquet
+ * @param vectorSchema the vector schema describing dimension and element
type
+ * @return an ArrayData containing the decoded float[], double[], or byte[]
array
+ * @throws IllegalArgumentException if byte array length doesn't match
expected size
+ */
+ public static ArrayData convertBinaryToVectorArray(byte[] bytes,
HoodieSchema.Vector vectorSchema) {
+ return convertBinaryToVectorArray(bytes, vectorSchema.getDimension(),
vectorSchema.getVectorElementType());
+ }
+
+ /**
+ * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to
a typed array.
+ *
+ * @param bytes raw bytes read from Parquet
+ * @param dim vector dimension (number of elements)
+ * @param elemType element type (FLOAT, DOUBLE, or INT8)
+ * @return an ArrayData containing the decoded float[], double[], or byte[]
array
+ * @throws IllegalArgumentException if byte array length doesn't match
expected size
+ */
+ public static ArrayData convertBinaryToVectorArray(byte[] bytes, int dim,
+
HoodieSchema.Vector.VectorElementType elemType) {
+ int expectedSize = dim * elemType.getElementSize();
+ checkArgument(bytes.length == expectedSize,
+ "Vector byte array length mismatch: expected " + expectedSize + " but
got " + bytes.length);
+ ByteBuffer buffer =
ByteBuffer.wrap(bytes).order(HoodieSchema.VectorLogicalType.VECTOR_BYTE_ORDER);
+ switch (elemType) {
+ case FLOAT:
+ float[] floats = new float[dim];
+ for (int j = 0; j < dim; j++) {
+ floats[j] = buffer.getFloat();
+ }
+ return new GenericArrayData(floats);
+ case DOUBLE:
+ double[] doubles = new double[dim];
+ for (int j = 0; j < dim; j++) {
+ doubles[j] = buffer.getDouble();
+ }
+ return new GenericArrayData(doubles);
+ case INT8:
+ byte[] int8s = new byte[dim];
+ buffer.get(int8s);
+ // Use UnsafeArrayData to avoid boxing each byte into a Byte object.
+ // GenericArrayData(byte[]) would box every element into Object[].
+ return UnsafeArrayData.fromPrimitiveArray(int8s);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported vector element type: " + elemType);
+ }
+ }
+
+ /**
+ * Returns a {@link Function} that converts a single {@link InternalRow} by
converting binary
+ * vector columns back to typed arrays and then applying the given
projection callback.
+ *
+ * <p>Ordinals in {@code vectorColumns} must be relative to {@code
readSchema} — the schema
+ * that has {@code BinaryType} for vector columns (as produced by
+ * {@link #replaceVectorColumnsWithBinary}).
+ *
+ * <p><b>Thread safety:</b> The returned function is NOT thread-safe; it
reuses a single
+ * {@link GenericInternalRow} buffer across calls. Each call to this factory
creates its own
+ * buffer, so separate functions returned by separate calls are independent.
+ *
+ * @param readSchema the Spark schema of incoming rows (BinaryType
for vector columns)
+ * @param vectorColumns map of ordinal → Vector schema for vector
columns, keyed by
+ * ordinals relative to {@code readSchema}
+ * @param projectionCallback called with the converted {@link
GenericInternalRow}; must copy
+ * any data it needs to retain (e.g. {@code
UnsafeProjection::apply})
+ * @return a function that converts one row and returns the projected result
+ */
+ public static Function<InternalRow, InternalRow> buildRowMapper(
+ StructType readSchema,
+ Map<Integer, HoodieSchema.Vector> vectorColumns,
+ Function<InternalRow, InternalRow> projectionCallback) {
+ GenericInternalRow converted = new
GenericInternalRow(readSchema.fields().length);
+ return row -> {
+ convertRowVectorColumns(row, converted, readSchema, vectorColumns);
+ return projectionCallback.apply(converted);
+ };
+ }
+
+ /**
+ * Delegates to {@link
HoodieSchema#buildVectorColumnsMetadataValue(HoodieSchema)}.
+ */
+ public static String buildVectorColumnsMetadataValue(HoodieSchema schema) {
+ return HoodieSchema.buildVectorColumnsMetadataValue(schema);
+ }
Review Comment:
Nit: Unnecessary delegation, doesn't seem to be required. Feel free to
ignore since this is a nit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]