hudi-agent commented on code in PR #18723:
URL: https://github.com/apache/hudi/pull/18723#discussion_r3314942898
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java:
##########
@@ -78,16 +82,55 @@ public interface AvroToRowDataConverter extends
Serializable {
//
-------------------------------------------------------------------------------------
// Runtime Converters
//
-------------------------------------------------------------------------------------
+
+ /**
+ * Creates a row converter from the Flink row type using UTC timezone
conversion.
+ *
+ * <p>Note that this RowType-only path cannot recover Hoodie-specific
logical type metadata
+ * that is not represented in Flink's {@link RowType}.
+ */
public static AvroToRowDataConverter createRowConverter(RowType rowType) {
return createRowConverter(rowType, true);
}
+ /**
+ * Creates a row converter from the Flink row type.
+ *
+ * <p>Note that this RowType-only path cannot recover Hoodie-specific
logical type metadata
+ * that is not represented in Flink's {@link RowType}.
+ */
public static AvroToRowDataConverter createRowConverter(RowType rowType,
boolean utcTimezone) {
- final AvroToRowDataConverter[] fieldConverters =
- rowType.getFields().stream()
- .map(RowType.RowField::getType)
- .map(type -> AvroToRowDataConverters.createNullableConverter(type,
utcTimezone))
- .toArray(AvroToRowDataConverter[]::new);
+ return createRowConverter(HoodieSchemaConverter.convertToSchema(rowType),
rowType, utcTimezone);
+ }
+
+ /**
+ * Creates a row converter using only the Flink row type.
+ *
+ * <p>This converter cannot recover Hoodie-specific logical type metadata
from {@link RowType}.
+ * Use {@link #createRowConverter(HoodieSchema, RowType, boolean)} when a
Hoodie schema is
+ * available, especially for VECTOR columns.
+ */
+ public static AvroToRowDataConverter createRowConverter(HoodieSchema
hoodieSchema) {
Review Comment:
🤖 nit: the Javadoc on this method says "Creates a row converter using only
the Flink row type" but the method takes a `HoodieSchema`, not a `RowType` — it
looks like the description was copied from one of the `RowType` overloads and
not updated. Could you update it to something like "Creates a row converter
from the given `HoodieSchema`, deriving the Flink row type internally"?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieVectorUtils.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utilities for decoding Hudi VECTOR fixed-bytes payloads.
+ */
+public final class HoodieVectorUtils {
+
+ private HoodieVectorUtils() {
+ }
+
+ /**
+ * Detects VECTOR columns in a HoodieSchema record and returns a map of
field ordinal
+ * to the corresponding {@link HoodieSchema.Vector} schema.
+ *
+ * @param schema a HoodieSchema of type RECORD (or null)
+ * @return map from field index to Vector schema; empty map if schema is
null or has no vectors
+ */
+ public static Map<Integer, HoodieSchema.Vector>
detectVectorColumns(HoodieSchema schema) {
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new LinkedHashMap<>();
+ if (schema == null) {
+ return vectorColumnInfo;
+ }
+ List<HoodieSchemaField> fields = schema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchema fieldSchema = fields.get(i).schema().getNonNullType();
+ if (fieldSchema.getType() == HoodieSchemaType.VECTOR) {
+ vectorColumnInfo.put(i, (HoodieSchema.Vector) fieldSchema);
+ }
+ }
+ return vectorColumnInfo;
+ }
+
+ /**
+ * Converts binary bytes from a FIXED_LEN_BYTE_ARRAY Parquet column back to
a typed array.
+ *
+ * @param bytes raw bytes read from Parquet
+ * @param vectorSchema vector schema
+ * @return an ArrayData containing the decoded float[], double[], or byte[]
array
+ * @throws IllegalArgumentException if byte array length doesn't match
expected size
+ */
+ public static Object decodeVectorBytes(byte[] bytes, HoodieSchema.Vector
vectorSchema) {
Review Comment:
🤖 nit: the `@return` tag says "an ArrayData containing the decoded float[],
double[], or byte[] array" — but the method returns a raw `float[]`,
`double[]`, or `byte[]` boxed as `Object`, not a Flink/Spark `ArrayData`. Could
you update the description to something like "a `float[]`, `double[]`, or
`byte[]` (returned as `Object`) depending on the element type"? (Same issue on
the overload a few lines above.)
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/VectorConversionUtils.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.HoodieVectorUtils;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for reading Hoodie VECTOR columns into Flink {@link RowData}.
+ *
+ * <p>VECTOR columns are stored as fixed-length bytes in parquet/log files.
The Flink reader
+ * reads those physical bytes first and then decodes them into Flink array
data according to
+ * the vector element type declared in {@link HoodieSchema.Vector}.
+ */
+public final class VectorConversionUtils {
+
+ private VectorConversionUtils() {
+ }
+
+ /**
+ * Detects VECTOR columns in the selected read projection.
+ *
+ * <p>The returned map is keyed by the projected field ordinal, not the
ordinal in the full
+ * table schema. This matches the ordinal layout of the {@link RowData}
emitted by the reader.
+ *
+ * @param fullFieldNames field names in the full query/table row type
+ * @param selectedFields ordinals selected from {@code fullFieldNames}
+ * @param tableSchema hoodie table schema containing logical VECTOR type
metadata
+ * @return projected ordinal to vector schema information
+ */
+ public static Map<Integer, HoodieSchema.Vector> detectVectorColumns(
+ String[] fullFieldNames,
+ int[] selectedFields,
+ HoodieSchema tableSchema) {
+ Map<String, HoodieSchema.Vector> vectorFields =
getVectorFields(tableSchema);
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo = new LinkedHashMap<>();
+ for (int i = 0; i < selectedFields.length; i++) {
+ HoodieSchema.Vector vector =
vectorFields.get(fullFieldNames[selectedFields[i]].toLowerCase(Locale.ROOT));
+ if (vector != null) {
+ vectorColumnInfo.put(i, vector);
+ }
+ }
+ return vectorColumnInfo;
+ }
+
+ /**
+ * Rewrites VECTOR fields in a requested row data type to BYTES for parquet
reads.
+ *
+ * <p>Parquet stores VECTOR values as fixed-length byte arrays. This method
keeps the requested
+ * row shape unchanged, but asks the physical parquet reader to materialize
VECTOR columns as
+ * bytes. The resulting rows should be passed through
+ * {@link #wrapVectorColumnIterator(ClosableIterator, RowType, Map)} to
decode the bytes into
+ * array data.
+ *
+ * @param dataType requested row data type
+ * @param requestedSchema requested Hudi schema
+ * @param vectorColumnInfo projected VECTOR column metadata
+ * @return row data type to use for the physical parquet read
+ */
+ public static DataType getParquetReadDataType(
+ DataType dataType,
+ HoodieSchema requestedSchema,
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo) {
+ if (vectorColumnInfo.isEmpty()) {
+ return dataType;
+ }
+
+ RowType rowType = (RowType) dataType.getLogicalType();
+ List<RowType.RowField> readFields = new
ArrayList<>(rowType.getFields().size());
+ for (RowType.RowField field : rowType.getFields()) {
+ readFields.add(field.copy());
+ }
+ for (Integer requestedOrdinal : vectorColumnInfo.keySet()) {
+ String fieldName =
requestedSchema.getFields().get(requestedOrdinal).name();
+ int fieldOrdinal = rowType.getFieldIndex(fieldName);
+ if (fieldOrdinal >= 0) {
+ RowType.RowField field = readFields.get(fieldOrdinal);
+ readFields.set(fieldOrdinal, newRowField(field,
bytesType(field.getType())));
+ }
+ }
+ return DataTypes.of(new RowType(rowType.isNullable(), readFields));
+ }
+
+ /**
+ * Rewrites VECTOR field types in the full field type array to BYTES for
parquet reads.
+ *
+ * <p>This variant is used by copy-on-write input format code paths that
keep the full field
+ * type array and apply projection separately.
+ *
+ * @param fullFieldNames field names in the full query/table row type
+ * @param fullFieldTypes field types corresponding to {@code fullFieldNames}
+ * @param tableSchema hoodie table schema containing logical VECTOR type
metadata
+ * @return field types to use for the physical parquet read
+ */
+ public static DataType[] getParquetReadFieldTypes(
+ String[] fullFieldNames,
+ DataType[] fullFieldTypes,
+ HoodieSchema tableSchema) {
+ Map<String, HoodieSchema.Vector> vectorFields =
getVectorFields(tableSchema);
+ if (vectorFields.isEmpty()) {
+ return fullFieldTypes;
+ }
+ DataType[] readFieldTypes = Arrays.copyOf(fullFieldTypes,
fullFieldTypes.length);
+ for (int i = 0; i < fullFieldNames.length; i++) {
+ if
(vectorFields.containsKey(fullFieldNames[i].toLowerCase(Locale.ROOT))) {
+ readFieldTypes[i] =
DataTypes.of(bytesType(fullFieldTypes[i].getLogicalType()));
+ }
+ }
+ return readFieldTypes;
+ }
+
+ /**
+ * Wraps a row iterator and decodes VECTOR byte values into Flink array
values.
+ *
+ * <p>{@code rowType} must describe the physical rows emitted by {@code
rowDataItr}, where VECTOR
+ * columns have already been read as BYTES. Non-vector fields are copied
with type-aware Flink
+ * field getters to preserve their original representation.
+ *
+ * @param rowDataItr physical row iterator
+ * @param rowType row type of the physical rows emitted by {@code
rowDataItr}
+ * @param vectorColumnInfo projected VECTOR column metadata keyed by row
ordinal
+ * @return iterator emitting rows with VECTOR columns decoded as arrays
+ */
+ public static ClosableIterator<RowData> wrapVectorColumnIterator(
+ ClosableIterator<RowData> rowDataItr,
+ RowType rowType,
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo) {
+ RowData.FieldGetter[] fieldGetters = createFieldGetters(rowType);
+ return new CloseableMappingIterator<>(
+ rowDataItr, rowData -> convertVectorColumns(rowData, fieldGetters,
vectorColumnInfo));
+ }
+
+ /**
+ * Wraps a projected row iterator and decodes VECTOR byte values into Flink
array values.
+ *
+ * <p>The selected physical row type is derived from {@code fullFieldTypes}
and
+ * {@code selectedFields}, then delegated to {@link
#wrapVectorColumnIterator(ClosableIterator, RowType, Map)}.
+ *
+ * @param rowDataItr physical row iterator
+ * @param fullFieldTypes field types before projection
+ * @param selectedFields ordinals selected from {@code fullFieldTypes}
+ * @param vectorColumnInfo projected VECTOR column metadata keyed by row
ordinal
+ * @return iterator emitting rows with VECTOR columns decoded as arrays
+ */
+ public static ClosableIterator<RowData> wrapVectorColumnIterator(
+ ClosableIterator<RowData> rowDataItr,
+ DataType[] fullFieldTypes,
+ int[] selectedFields,
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo) {
+ return wrapVectorColumnIterator(rowDataItr, createRowType(fullFieldTypes,
selectedFields), vectorColumnInfo);
+ }
+
+ /**
+ * Returns VECTOR fields keyed by lower-cased field name.
+ */
+ private static Map<String, HoodieSchema.Vector> getVectorFields(HoodieSchema
tableSchema) {
+ return tableSchema.getFields().stream()
+ .filter(field -> field.schema().getNonNullType().getType() ==
HoodieSchemaType.VECTOR)
+ .collect(Collectors.toMap(
+ field -> field.name().toLowerCase(Locale.ROOT),
+ field -> (HoodieSchema.Vector) field.schema().getNonNullType()));
+ }
+
+ /**
+ * Creates a BYTES logical type while preserving the original nullability.
+ */
+ private static LogicalType bytesType(LogicalType originalType) {
+ return DataTypes.BYTES().getLogicalType().copy(originalType.isNullable());
+ }
+
+ /**
+ * Creates a row field with a replacement logical type while preserving
metadata.
+ */
+ private static RowType.RowField newRowField(RowType.RowField field,
LogicalType type) {
+ return field.getDescription()
+ .map(description -> new RowType.RowField(field.getName(), type,
description))
+ .orElseGet(() -> new RowType.RowField(field.getName(), type));
+ }
+
+ /**
+ * Converts VECTOR columns in one row from physical bytes to Flink array
data.
+ */
+ private static RowData convertVectorColumns(
+ RowData rowData,
+ RowData.FieldGetter[] fieldGetters,
+ Map<Integer, HoodieSchema.Vector> vectorColumnInfo) {
+ GenericRowData converted = new GenericRowData(rowData.getArity());
+ converted.setRowKind(rowData.getRowKind());
+ for (int i = 0; i < rowData.getArity(); i++) {
+ if (rowData.isNullAt(i)) {
+ converted.setField(i, null);
+ } else if (vectorColumnInfo.containsKey(i)) {
+ converted.setField(i, createVectorArrayData(rowData.getBinary(i),
vectorColumnInfo.get(i)));
+ } else {
+ converted.setField(i, fieldGetters[i].getFieldOrNull(rowData));
+ }
+ }
+ return converted;
+ }
+
+ /**
+ * Creates type-aware field getters for the physical row type.
+ */
+ private static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
+ RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[rowType.getFieldCount()];
+ for (int i = 0; i < fieldGetters.length; i++) {
+ fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
+ }
+ return fieldGetters;
+ }
+
+ /**
+ * Builds a projected row type from full field types and selected ordinals.
+ */
+ private static RowType createRowType(DataType[] fullFieldTypes, int[]
selectedFields) {
+ List<RowType.RowField> rowFields = new ArrayList<>(selectedFields.length);
+ for (int i = 0; i < selectedFields.length; i++) {
+ rowFields.add(new RowType.RowField(String.valueOf(i),
fullFieldTypes[selectedFields[i]].getLogicalType()));
+ }
+ return new RowType(rowFields);
+ }
+
+ /**
+ * Wraps a decoded primitive vector array as Flink array data.
+ */
+ public static GenericArrayData createVectorArrayData(byte[] bytes,
HoodieSchema.Vector vectorSchema) {
+ Object vectorArray = HoodieVectorUtils.decodeVectorBytes(bytes,
vectorSchema);
+ if (vectorArray instanceof float[]) {
+ return new GenericArrayData((float[]) vectorArray);
+ } else if (vectorArray instanceof double[]) {
+ return new GenericArrayData((double[]) vectorArray);
+ } else if (vectorArray instanceof byte[]) {
+ return new GenericArrayData((byte[]) vectorArray);
+ }
+ throw new UnsupportedOperationException("Unsupported decoded vector array
type: " + vectorArray.getClass());
+ }
+
+ public static void validateVectorLogicalType(HoodieSchema.Vector
vectorSchema, LogicalType type) {
Review Comment:
🤖 nit: every other public method in this class has a Javadoc, but
`validateVectorLogicalType` doesn't — it might be worth adding at least a
one-liner describing what it validates and that it throws
`SchemaCompatibilityException` on mismatch.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]