vinothchandar commented on code in PR #18108:
URL: https://github.com/apache/hudi/pull/18108#discussion_r2827752859
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -79,20 +80,25 @@ object HoodieSparkSchemaConverters {
// Complex types
case ArrayType(elementType, containsNull) =>
- val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace)
+ val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace, metadata)
HoodieSchema.createArray(elementSchema)
case MapType(StringType, valueType, valueContainsNull) =>
- val valueSchema = toHoodieType(valueType, valueContainsNull,
recordName, nameSpace)
+ val valueSchema = toHoodieType(valueType, valueContainsNull,
recordName, nameSpace, metadata)
HoodieSchema.createMap(valueSchema)
+ case st: StructType if
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
Review Comment:
💅 **NIT**
rename `st` => `blobStruct`
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -941,6 +963,86 @@ public HoodieSchema getNonNullType() {
return HoodieSchema.createUnion(nonNullTypes);
}
+ boolean containsBlobType() {
+ if (getType() == HoodieSchemaType.BLOB) {
+ return true;
+ } else if (getType() == HoodieSchemaType.ARRAY) {
+ return getElementType().containsBlobType();
+ } else if (getType() == HoodieSchemaType.MAP) {
+ return getValueType().containsBlobType();
+ } else if (getType() == HoodieSchemaType.UNION) {
+ return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+ } else if (hasFields()) {
+ return getFields().stream().anyMatch(field ->
field.schema().containsBlobType());
+ }
+ return false;
+ }
+
+ /**
+ * A convenience method to check if the current field represents a blob type.
+ * This checks if the current schema is a BLOB or if it is an ARRAY or MAP
whose element or value type is a BLOB, respectively.
+ * It does not check for BLOB types nested within unions or record fields.
+ * @return true if the current schema is a BLOB or an ARRAY/MAP of BLOBs,
false otherwise
+ */
+ public boolean isBlobField() {
+ HoodieSchema nonNullSchema = getNonNullType();
+ HoodieSchemaType nonNullSchemaType = nonNullSchema.getType();
+ return nonNullSchemaType == HoodieSchemaType.BLOB
+ || (nonNullSchemaType == HoodieSchemaType.ARRAY &&
nonNullSchema.getElementType().getNonNullType().getType() ==
HoodieSchemaType.BLOB)
+ || (nonNullSchemaType == HoodieSchemaType.MAP &&
nonNullSchema.getValueType().getNonNullType().getType() ==
HoodieSchemaType.BLOB);
+ }
+
+ /**
+ * Validates that the schema does not contain variants with shredded blob
types.
+ * This method recursively traverses the schema tree to check for invalid
structures.
+ *
+ * @param schema the schema to validate
+ * @throws HoodieSchemaException if the schema contains arrays or maps with
blob types
+ */
+ private static void validateNoBlobsInVariant(HoodieSchema schema) {
Review Comment:
💅 **NIT**
rename to. `validateNoBlobsInsideVariant` .. First time, I read it as
`invariant`
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -190,11 +196,21 @@ object HoodieSparkSchemaConverters {
val newRecordNames = existingRecordNames + fullName
val fields = hoodieSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
- val metadata = if (f.doc().isPresent && !f.doc().get().isEmpty) {
+ val commentMetadata = if (f.doc().isPresent &&
!f.doc().get().isEmpty) {
new MetadataBuilder().putString("comment", f.doc().get()).build()
} else {
Metadata.empty
}
+ val fieldSchema = f.getNonNullSchema
+ val metadata = if (fieldSchema.isBlobField) {
+ // Mark blob fields with metadata for identification
+ new MetadataBuilder()
+ .withMetadata(commentMetadata)
+ .putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
+ .build()
+ } else {
+ commentMetadata
+ }
StructField(f.name(), schemaType.dataType, schemaType.nullable,
metadata)
}
SchemaType(StructType(fields.toSeq), nullable = false)
Review Comment:
💬 **SUGGESTION: This works as long as blobs are always nested inside a
parent record in practice. i.e not in `catalystType` directly is never a BLOB.
This is an acceptable limitation — just worth documenting explicitly.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1883,6 +1985,127 @@ public int hashCode() {
}
}
+ static class BlobLogicalType extends LogicalType {
+
+ private static final String BLOB_LOGICAL_TYPE_NAME = "blob";
+ // Eager initialization of singleton
+ private static final BlobLogicalType INSTANCE = new BlobLogicalType();
+
+ private BlobLogicalType() {
+ super(BlobLogicalType.BLOB_LOGICAL_TYPE_NAME);
+ }
+
+ public static BlobLogicalType blob() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ if (schema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException("Blob logical type can only be
applied to RECORD schemas, got: " + schema.getType());
+ }
+ if (!schema.getFields().equals(HoodieSchema.Blob.BLOB_FIELDS)) {
+ throw new IllegalArgumentException("Blob logical type cannot be
applied to schema: " + schema);
+ }
+ }
Review Comment:
💬 **SUGGESTION**:
```java
if (!schema.getFields().equals(HoodieSchema.Blob.BLOB_FIELDS)) {
```
Here we are relying on order and field equals() from avro. Should we
explictly compare the individual fields like we did for Spark/Flink converters?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -941,6 +963,86 @@ public HoodieSchema getNonNullType() {
return HoodieSchema.createUnion(nonNullTypes);
}
+ boolean containsBlobType() {
+ if (getType() == HoodieSchemaType.BLOB) {
+ return true;
+ } else if (getType() == HoodieSchemaType.ARRAY) {
+ return getElementType().containsBlobType();
+ } else if (getType() == HoodieSchemaType.MAP) {
+ return getValueType().containsBlobType();
+ } else if (getType() == HoodieSchemaType.UNION) {
+ return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+ } else if (hasFields()) {
+ return getFields().stream().anyMatch(field ->
field.schema().containsBlobType());
+ }
+ return false;
+ }
+
+ /**
+ * A convenience method to check if the current field represents a blob type.
+ * This checks if the current schema is a BLOB or if it is an ARRAY or MAP
whose element or value type is a BLOB, respectively.
+ * It does not check for BLOB types nested within unions or record fields.
+ * @return true if the current schema is a BLOB or an ARRAY/MAP of BLOBs,
false otherwise
+ */
+ public boolean isBlobField() {
+ HoodieSchema nonNullSchema = getNonNullType();
+ HoodieSchemaType nonNullSchemaType = nonNullSchema.getType();
+ return nonNullSchemaType == HoodieSchemaType.BLOB
+ || (nonNullSchemaType == HoodieSchemaType.ARRAY &&
nonNullSchema.getElementType().getNonNullType().getType() ==
HoodieSchemaType.BLOB)
+ || (nonNullSchemaType == HoodieSchemaType.MAP &&
nonNullSchema.getValueType().getNonNullType().getType() ==
HoodieSchemaType.BLOB);
+ }
+
+ /**
+ * Validates that the schema does not contain variants with shredded blob
types.
+ * This method recursively traverses the schema tree to check for invalid
structures.
+ *
+ * @param schema the schema to validate
+ * @throws HoodieSchemaException if the schema contains arrays or maps with
blob types
+ */
+ private static void validateNoBlobsInVariant(HoodieSchema schema) {
+ if (schema == null) {
+ return;
+ }
+
+ HoodieSchemaType type = schema.getType();
+
+ switch (type) {
+ case ARRAY:
+ HoodieSchema elementType = schema.getElementType();
+ validateNoBlobsInVariant(elementType);
+ break;
+ case MAP:
+ HoodieSchema valueType = schema.getValueType();
+ validateNoBlobsInVariant(valueType);
+ break;
+ case VARIANT:
+ HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) schema;
+ variantSchema.getTypedValueField().ifPresent(typedValueField -> {
+ if (typedValueField.getNonNullType().containsBlobType()) {
+ throw new HoodieSchemaException("Variant typed_value field cannot
be or contain a BLOB type");
+ }
+ });
+ break;
+ case RECORD:
+ // Validate all record fields
+ List<HoodieSchemaField> fields = schema.getFields();
+ for (HoodieSchemaField field : fields) {
+ validateNoBlobsInVariant(field.schema());
+ }
+ break;
+ case UNION:
+ // Validate all union types
+ List<HoodieSchema> types = schema.getTypes();
+ for (HoodieSchema unionType : types) {
+ validateNoBlobsInVariant(unionType);
+ }
+ break;
+ // For primitives, BLOB, ENUM, FIXED, NULL - no nested validation needed
+ default:
+ break;
+ }
+ }
Review Comment:
💅 **NIT: `validateNoBlobsInVariant` is called on every `fromAvroSchema`
call**
This adds a recursive traversal on every schema parse. For schemas without
variants this traversal is wasted work. Since this validation only matters for
variant+blob combinations, it could be gated on whether the schema actually
contains a variant or piggybacked on any other traversal already done on this
nature? Performance impact is negligible for smaller/common schemas, but worth
thinking for large schemas with many nested types.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2047,14 +2047,15 @@ private static boolean
isColumnTypeSupportedV1(HoodieSchema schema, Option<Hoodi
// HUDI-8585 will add support for BYTES and FIXED
return type != HoodieSchemaType.RECORD && type != HoodieSchemaType.ARRAY
&& type != HoodieSchemaType.MAP
&& type != HoodieSchemaType.ENUM && type != HoodieSchemaType.BYTES &&
type != HoodieSchemaType.FIXED
- && type != HoodieSchemaType.DECIMAL; // DECIMAL's underlying type is
BYTES
+ && type != HoodieSchemaType.BLOB && type != HoodieSchemaType.DECIMAL;
// DECIMAL's underlying type is BYTES
Review Comment:
💅 **NIT** fix or remove comment
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala:
##########
@@ -18,12 +18,15 @@
package org.apache.spark.sql.avro
import org.apache.hudi.avro.model.HoodieMetadataColumnStats
-import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField,
HoodieSchemaType}
import org.apache.avro.JsonProperties
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField,
StructType}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
import org.junit.jupiter.api.Test
+import java.util
+
class TestSchemaConverters {
Review Comment:
We should home those tests in right places. Need not happen in this PR per
se.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -630,6 +648,10 @@ public static HoodieSchema.Variant
createVariantShredded(String name, String nam
return new HoodieSchema.Variant(recordSchema);
}
+ public static HoodieSchema.Blob createBlob() {
+ return new HoodieSchema.Blob(Blob.DEFAULT_NAME);
+ }
Review Comment:
⚠️ **IMPORTANT**
Does using the DEFAULT_NAME work when there are multiple BLOB fields in the
schema.? or do we need a way to pass in the name as we do for variant/decimal
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -941,6 +963,86 @@ public HoodieSchema getNonNullType() {
return HoodieSchema.createUnion(nonNullTypes);
}
+ boolean containsBlobType() {
+ if (getType() == HoodieSchemaType.BLOB) {
+ return true;
+ } else if (getType() == HoodieSchemaType.ARRAY) {
+ return getElementType().containsBlobType();
+ } else if (getType() == HoodieSchemaType.MAP) {
+ return getValueType().containsBlobType();
+ } else if (getType() == HoodieSchemaType.UNION) {
+ return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+ } else if (hasFields()) {
+ return getFields().stream().anyMatch(field ->
field.schema().containsBlobType());
+ }
+ return false;
+ }
+
+ /**
+ * A convenience method to check if the current field represents a blob type.
+ * This checks if the current schema is a BLOB or if it is an ARRAY or MAP
whose element or value type is a BLOB, respectively.
+ * It does not check for BLOB types nested within unions or record fields.
+ * @return true if the current schema is a BLOB or an ARRAY/MAP of BLOBs,
false otherwise
+ */
+ public boolean isBlobField() {
+ HoodieSchema nonNullSchema = getNonNullType();
+ HoodieSchemaType nonNullSchemaType = nonNullSchema.getType();
+ return nonNullSchemaType == HoodieSchemaType.BLOB
+ || (nonNullSchemaType == HoodieSchemaType.ARRAY &&
nonNullSchema.getElementType().getNonNullType().getType() ==
HoodieSchemaType.BLOB)
+ || (nonNullSchemaType == HoodieSchemaType.MAP &&
nonNullSchema.getValueType().getNonNullType().getType() ==
HoodieSchemaType.BLOB);
+ }
+
+ /**
+ * Validates that the schema does not contain variants with shredded blob
types.
+ * This method recursively traverses the schema tree to check for invalid
structures.
+ *
+ * @param schema the schema to validate
+ * @throws HoodieSchemaException if the schema contains arrays or maps with
blob types
+ */
+ private static void validateNoBlobsInVariant(HoodieSchema schema) {
+ if (schema == null) {
+ return;
+ }
+
+ HoodieSchemaType type = schema.getType();
+
+ switch (type) {
+ case ARRAY:
+ HoodieSchema elementType = schema.getElementType();
+ validateNoBlobsInVariant(elementType);
+ break;
+ case MAP:
+ HoodieSchema valueType = schema.getValueType();
+ validateNoBlobsInVariant(valueType);
+ break;
+ case VARIANT:
+ HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) schema;
+ variantSchema.getTypedValueField().ifPresent(typedValueField -> {
+ if (typedValueField.getNonNullType().containsBlobType()) {
+ throw new HoodieSchemaException("Variant typed_value field cannot
be or contain a BLOB type");
+ }
+ });
+ break;
+ case RECORD:
+ // Validate all record fields
+ List<HoodieSchemaField> fields = schema.getFields();
Review Comment:
💅 **NIT**
Can we do 1 line lambdas like `schema.getFields().forEach(field ->
validateNoBlobsInVariant(field.schema()));` .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]