voonhous commented on code in PR #18190:
URL: https://github.com/apache/hudi/pull/18190#discussion_r2888763293
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -196,25 +233,29 @@ object HoodieSparkSchemaConverters {
val newRecordNames = existingRecordNames + fullName
val fields = hoodieSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
Review Comment:
CMIIW, `#toSqlTypeHelper` injects vector related metadata,, line 243 writes
it again, there is a double write operation here, is this intended?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -196,25 +233,29 @@ object HoodieSparkSchemaConverters {
val newRecordNames = existingRecordNames + fullName
val fields = hoodieSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
- val commentMetadata = if (f.doc().isPresent &&
!f.doc().get().isEmpty) {
- new MetadataBuilder().putString("comment", f.doc().get()).build()
- } else {
- Metadata.empty
- }
val fieldSchema = f.getNonNullSchema
- val metadata = if (fieldSchema.isBlobField) {
- // Mark blob fields with metadata for identification.
- // This assumes blobs are always part of a record and not the top
level schema itself
- new MetadataBuilder()
- .withMetadata(commentMetadata)
- .putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
- .build()
- } else {
- commentMetadata
+ val metadataBuilder = new MetadataBuilder()
+ .withMetadata(schemaType.metadata.getOrElse(Metadata.empty))
+ if (f.doc().isPresent && f.doc().get().nonEmpty) {
+ metadataBuilder.putString("comment", f.doc().get())
+ }
+ if (fieldSchema.getType == HoodieSchemaType.VECTOR) {
+ metadataBuilder.putString(HoodieSchema.TYPE_METADATA_FIELD,
fieldSchema.toTypeString())
+ } else if (fieldSchema.isBlobField) {
+ metadataBuilder.putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
Review Comment:
Blob uses `#name` while vector uses `#toTypeString`.
This is fine if Blob doesn't have parameters like vectors does. i.e.
`VECTOR(1024)` and BLOB and will not be `BLOB(1024)`.
However, OCD in me feels this should be standardised. let's use
`#toTypeString`?
NIT: same as above comment, scala call style, we don't need parenthesis
after relevant changes made here.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -79,6 +80,33 @@ object HoodieSparkSchemaConverters {
HoodieSchema.createDecimal(name, nameSpace, null, d.precision,
d.scale, fixedSize)
// Complex types
+ case ArrayType(elementSparkType, containsNull)
+ if metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
+
metadata.getString(HoodieSchema.TYPE_METADATA_FIELD).startsWith("VECTOR") =>
+ if (containsNull) {
+ throw new HoodieSchemaException(
+ s"VECTOR type does not support nullable elements (field:
$recordName)")
+ }
+
+ val vectorSchema = HoodieSchema
+
.parseTypeString(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ .asInstanceOf[HoodieSchema.Vector]
+ val dimension = vectorSchema.getDimension
+ if (dimension <= 0) {
+ throw new HoodieSchemaException(
+ s"VECTOR dimension must be positive, got: $dimension (field:
$recordName)")
+ }
Review Comment:
Dead code, `parseTypeString` already calls `createVector(dimension, ...)`
which validates `dimension > 0`.
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -212,6 +215,41 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
updater.setDecimal(ordinal, decimal)
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val elementSize = elementType match {
+ case FloatType => 4
+ case DoubleType => 8
+ case ByteType => 1
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ (updater, ordinal, value) => {
+ val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+ val expectedSize = dimension * elementSize
+ if (bytes.length != expectedSize) {
+ throw new IncompatibleSchemaException(
+ s"VECTOR byte size mismatch: expected=$expectedSize,
actual=${bytes.length}")
+ }
+ elementType match {
+ case FloatType =>
+ val buffer =
ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN)
Review Comment:
Design wise, `LITTLE_ENDIAN` is fine ,but we should document this somewhere,
or make this a named constant in VectorLogicalType and reused everywhere to
prevent copy-paste errors. To me, this is just a code smell.
Let's also document this and we should ensure that there are test ensuring
this byte order works with explicit byte level assertions.
Don't think anyone is going to write a custom reader, but i feel it's still
a good cleanup.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
Review Comment:
There's no test covering the `UNION+VECTOR+nullable` combination. Add a test
to guard against future regressions (e.g., someone refactoring SchemaType or
changing the UNION path).
Just want to make sure that copy does not silently vector metadata for
nullable vectors.
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -212,6 +215,41 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
updater.setDecimal(ordinal, decimal)
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val elementSize = elementType match {
+ case FloatType => 4
+ case DoubleType => 8
+ case ByteType => 1
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ }
+ (updater, ordinal, value) => {
+ val bytes = value.asInstanceOf[GenericData.Fixed].bytes()
+ val expectedSize = dimension * elementSize
Review Comment:
Will there be any overflow issues here int overflow issues here?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -196,25 +233,29 @@ object HoodieSparkSchemaConverters {
val newRecordNames = existingRecordNames + fullName
val fields = hoodieSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
- val commentMetadata = if (f.doc().isPresent &&
!f.doc().get().isEmpty) {
- new MetadataBuilder().putString("comment", f.doc().get()).build()
- } else {
- Metadata.empty
- }
val fieldSchema = f.getNonNullSchema
- val metadata = if (fieldSchema.isBlobField) {
- // Mark blob fields with metadata for identification.
- // This assumes blobs are always part of a record and not the top
level schema itself
- new MetadataBuilder()
- .withMetadata(commentMetadata)
- .putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
- .build()
- } else {
- commentMetadata
+ val metadataBuilder = new MetadataBuilder()
+ .withMetadata(schemaType.metadata.getOrElse(Metadata.empty))
+ if (f.doc().isPresent && f.doc().get().nonEmpty) {
+ metadataBuilder.putString("comment", f.doc().get())
+ }
+ if (fieldSchema.getType == HoodieSchemaType.VECTOR) {
+ metadataBuilder.putString(HoodieSchema.TYPE_METADATA_FIELD,
fieldSchema.toTypeString())
Review Comment:
NIT: since this is a scala file, let's keep call style consistent.
`toTypeString` without the parenthesis.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -84,18 +86,118 @@
*/
public class HoodieSchema implements Serializable {
private static final long serialVersionUID = 1L;
+
/**
* Constant representing a null JSON value, equivalent to
JsonProperties.NULL_VALUE.
* This provides compatibility with Avro's JsonProperties while maintaining
Hudi's API.
*/
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
-
/**
* Constant to use when attaching type metadata to external schema systems
like Spark's StructType.
+ * Stores a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
*/
public static final String TYPE_METADATA_FIELD = "hudi_type";
+ /**
+ * Converts a HoodieSchema to its parameterized type string for custom Hudi
logical types
+ * such as VECTOR and BLOB. Only supports custom logical types — throws for
standard types.
+ * Parameterized types include positional parameters: "VECTOR(128)",
"VECTOR(128, DOUBLE)".
+ * Default parameters are omitted: VECTOR(dim) implies elementType=FLOAT.
+ */
+ public String toTypeString() {
+ HoodieSchemaType type = getType();
+ switch (type) {
+ case VECTOR:
+ Vector v = (Vector) this;
+ if (v.getVectorElementType() == Vector.VectorElementType.FLOAT) {
+ return "VECTOR(" + v.getDimension() + ")";
+ }
+ return "VECTOR(" + v.getDimension() + ", " + v.getVectorElementType()
+ ")";
+ case BLOB:
+ return "BLOB";
+ default:
+ throw new IllegalArgumentException(
+ "toTypeString only supports custom logical types, got: " + type);
+ }
+ }
Review Comment:
This is a method on `HoodieSchema`, if `intHoodieSchema` invokes this
method, there will be an `IllegalArgumentException` thrown right?
If we do not want to expand this to all types, we can limit this to `static
formatTypeString(VectorHoodieSchema)` or something.
Or follow the OOP way to where we define an abstract class, and it is up to
the child classes to override.
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala:
##########
@@ -212,6 +215,41 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
updater.setDecimal(ordinal, decimal)
+ // Handle VECTOR logical type (FLOAT, DOUBLE, INT8)
+ case (FIXED, ArrayType(elementType, false)) => avroType.getLogicalType
match {
+ case vectorLogicalType: VectorLogicalType =>
+ val dimension = vectorLogicalType.getDimension
+ val elementSize = elementType match {
+ case FloatType => 4
+ case DoubleType => 8
+ case ByteType => 1
+ case _ => throw new IncompatibleSchemaException(incompatibleMsg)
Review Comment:
These hardcoded magic numbers seems fragile to me too.
`org.apache.hudi.common.schema.HoodieSchema.Vector.VectorElementType`
already has getElementSize, let's use these Enum's defined size.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -79,6 +80,33 @@ object HoodieSparkSchemaConverters {
HoodieSchema.createDecimal(name, nameSpace, null, d.precision,
d.scale, fixedSize)
// Complex types
+ case ArrayType(elementSparkType, containsNull)
+ if metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
+
metadata.getString(HoodieSchema.TYPE_METADATA_FIELD).startsWith("VECTOR") =>
Review Comment:
Let's standardise with line #119.
`metadata.getString(HoodieSchema.TYPE_METADATA_FIELD).startsWith("VECTOR")`
is fragile.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -84,18 +86,118 @@
*/
public class HoodieSchema implements Serializable {
private static final long serialVersionUID = 1L;
+
/**
* Constant representing a null JSON value, equivalent to
JsonProperties.NULL_VALUE.
* This provides compatibility with Avro's JsonProperties while maintaining
Hudi's API.
*/
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
-
/**
* Constant to use when attaching type metadata to external schema systems
like Spark's StructType.
+ * Stores a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)", "VECTOR(512, DOUBLE)", "BLOB".
*/
public static final String TYPE_METADATA_FIELD = "hudi_type";
+ /**
+ * Converts a HoodieSchema to its parameterized type string for custom Hudi
logical types
+ * such as VECTOR and BLOB. Only supports custom logical types — throws for
standard types.
+ * Parameterized types include positional parameters: "VECTOR(128)",
"VECTOR(128, DOUBLE)".
+ * Default parameters are omitted: VECTOR(dim) implies elementType=FLOAT.
+ */
+ public String toTypeString() {
+ HoodieSchemaType type = getType();
+ switch (type) {
+ case VECTOR:
+ Vector v = (Vector) this;
+ if (v.getVectorElementType() == Vector.VectorElementType.FLOAT) {
+ return "VECTOR(" + v.getDimension() + ")";
+ }
+ return "VECTOR(" + v.getDimension() + ", " + v.getVectorElementType()
+ ")";
+ case BLOB:
+ return "BLOB";
+ default:
+ throw new IllegalArgumentException(
+ "toTypeString only supports custom logical types, got: " + type);
+ }
+ }
+
+ /**
+ * Parses a parameterized type string for custom Hudi logical types such as
VECTOR and BLOB.
+ * Examples: "VECTOR(128)" or "VECTOR(512, DOUBLE)".
+ * Throws for non-custom logical type names.
+ */
+ public static HoodieSchema parseTypeString(String descriptor) {
+ Pair<HoodieSchemaType, List<String>> parsedDescriptor =
parseTypeDescriptor(descriptor);
+ HoodieSchemaType type = parsedDescriptor.getLeft();
+ List<String> params = parsedDescriptor.getRight();
+ switch (type) {
+ case VECTOR:
+ if (params.isEmpty()) {
+ throw new IllegalArgumentException("VECTOR type descriptor must
include a dimension parameter");
+ }
+ if (params.size() > 2) {
+ throw new IllegalArgumentException(
+ "VECTOR type descriptor supports at most 2 parameters: dimension
and optional element type");
+ }
+ int dimension;
+ try {
+ dimension = Integer.parseInt(params.get(0));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid VECTOR dimension: " +
params.get(0), e);
+ }
+ Vector.VectorElementType elementType = params.size() > 1
+ ? Vector.VectorElementType.fromString(params.get(1))
+ : Vector.VectorElementType.FLOAT;
+ return createVector(dimension, elementType);
+ case BLOB:
+ if (!params.isEmpty()) {
+ throw new IllegalArgumentException(
+ "BLOB type descriptor does not support parameters, got: " +
params);
+ }
+ return createBlob();
+ default:
+ throw new IllegalArgumentException(
+ "parseTypeString only supports custom logical types, got: " +
type);
+ }
+ }
+
+ private static Pair<HoodieSchemaType, List<String>>
parseTypeDescriptor(String descriptor) {
+ ValidationUtils.checkArgument(descriptor != null &&
!descriptor.trim().isEmpty(),
+ "Type descriptor cannot be null or empty");
+ int parenStart = descriptor.indexOf('(');
+ String typeName;
+ List<String> params;
+ if (parenStart == -1) {
+ typeName = descriptor.trim();
+ params = Collections.emptyList();
+ } else {
+ if (!descriptor.endsWith(")")) {
+ throw new IllegalArgumentException("Malformed type descriptor, missing
closing ')': " + descriptor);
+ }
+ typeName = descriptor.substring(0, parenStart).trim();
+ String paramStr = descriptor.substring(parenStart + 1,
descriptor.length() - 1).trim();
+ if (paramStr.isEmpty()) {
+ params = Collections.emptyList();
+ } else {
+ params = Arrays.stream(paramStr.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ }
+ }
+ HoodieSchemaType type =
HoodieSchemaType.valueOf(typeName.toUpperCase(Locale.ROOT));
+ if (!CUSTOM_LOGICAL_TYPES.contains(type)) {
+ throw new IllegalArgumentException(
+ "parseTypeString only supports custom logical types, got: " + type);
+ }
Review Comment:
```
try {
type = HoodieSchemaType.valueOf(typeName.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown Hudi schema type: " +
typeName, e);
}
```
This should be better, instead of having a generic JVM message being thrown
if ENUM class cannot find a matching valueOf.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]