vinothchandar commented on code in PR #18108:
URL: https://github.com/apache/hudi/pull/18108#discussion_r2827752859


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -79,20 +80,25 @@ object HoodieSparkSchemaConverters {
 
       // Complex types
       case ArrayType(elementType, containsNull) =>
-        val elementSchema = toHoodieType(elementType, containsNull, 
recordName, nameSpace)
+        val elementSchema = toHoodieType(elementType, containsNull, 
recordName, nameSpace, metadata)
         HoodieSchema.createArray(elementSchema)
 
       case MapType(StringType, valueType, valueContainsNull) =>
-        val valueSchema = toHoodieType(valueType, valueContainsNull, 
recordName, nameSpace)
+        val valueSchema = toHoodieType(valueType, valueContainsNull, 
recordName, nameSpace, metadata)
         HoodieSchema.createMap(valueSchema)
 
+      case st: StructType if 
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&

Review Comment:
   💅 **NIT**
   
   rename `st` => `blobStruct` 



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -941,6 +963,86 @@ public HoodieSchema getNonNullType() {
     return HoodieSchema.createUnion(nonNullTypes);
   }
 
+  boolean containsBlobType() {
+    if (getType() == HoodieSchemaType.BLOB) {
+      return true;
+    } else if (getType() == HoodieSchemaType.ARRAY) {
+      return getElementType().containsBlobType();
+    } else if (getType() == HoodieSchemaType.MAP) {
+      return getValueType().containsBlobType();
+    } else if (getType() == HoodieSchemaType.UNION) {
+      return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+    } else if (hasFields()) {
+      return getFields().stream().anyMatch(field -> 
field.schema().containsBlobType());
+    }
+    return false;
+  }
+
+  /**
+   * A convenience method to check if the current field represents a blob type.
+   * This checks if the current schema is a BLOB or if it is an ARRAY or MAP 
whose element or value type is a BLOB, respectively.
+   * It does not check for BLOB types nested within unions or record fields.
+   * @return true if the current schema is a BLOB or an ARRAY/MAP of BLOBs, 
false otherwise
+   */
+  public boolean isBlobField() {
+    HoodieSchema nonNullSchema = getNonNullType();
+    HoodieSchemaType nonNullSchemaType = nonNullSchema.getType();
+    return nonNullSchemaType == HoodieSchemaType.BLOB
+        || (nonNullSchemaType == HoodieSchemaType.ARRAY && 
nonNullSchema.getElementType().getNonNullType().getType() == 
HoodieSchemaType.BLOB)
+        || (nonNullSchemaType == HoodieSchemaType.MAP && 
nonNullSchema.getValueType().getNonNullType().getType() == 
HoodieSchemaType.BLOB);
+  }
+
+  /**
+   * Validates that the schema does not contain variants with shredded blob 
types.
+   * This method recursively traverses the schema tree to check for invalid 
structures.
+   *
+   * @param schema the schema to validate
+   * @throws HoodieSchemaException if the schema contains arrays or maps with 
blob types
+   */
+  private static void validateNoBlobsInVariant(HoodieSchema schema) {

Review Comment:
   💅 **NIT** 
   
   rename to. `validateNoBlobsInsideVariant`  .. First time, I read it as 
`invariant`



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -190,11 +196,21 @@ object HoodieSparkSchemaConverters {
         val newRecordNames = existingRecordNames + fullName
         val fields = hoodieSchema.getFields.asScala.map { f =>
           val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
-          val metadata = if (f.doc().isPresent && !f.doc().get().isEmpty) {
+          val commentMetadata = if (f.doc().isPresent && 
!f.doc().get().isEmpty) {
             new MetadataBuilder().putString("comment", f.doc().get()).build()
           } else {
             Metadata.empty
           }
+          val fieldSchema = f.getNonNullSchema
+          val metadata = if (fieldSchema.isBlobField) {
+            // Mark blob fields with metadata for identification
+            new MetadataBuilder()
+              .withMetadata(commentMetadata)
+              .putString(HoodieSchema.TYPE_METADATA_FIELD, 
HoodieSchemaType.BLOB.name())
+              .build()
+          } else {
+            commentMetadata
+          }
           StructField(f.name(), schemaType.dataType, schemaType.nullable, 
metadata)
         }
         SchemaType(StructType(fields.toSeq), nullable = false)

Review Comment:
   💬 **SUGGESTION: This works as long as blobs are always nested inside a 
parent record in practice. i.e not in `catalystType`  directly is never a BLOB. 
This is an acceptable limitation — just worth documenting explicitly. 



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1883,6 +1985,127 @@ public int hashCode() {
     }
   }
 
+  static class BlobLogicalType extends LogicalType {
+
+    private static final String BLOB_LOGICAL_TYPE_NAME = "blob";
+    // Eager initialization of singleton
+    private static final BlobLogicalType INSTANCE = new BlobLogicalType();
+
+    private BlobLogicalType() {
+      super(BlobLogicalType.BLOB_LOGICAL_TYPE_NAME);
+    }
+
+    public static BlobLogicalType blob() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getType() != Schema.Type.RECORD) {
+        throw new IllegalArgumentException("Blob logical type can only be 
applied to RECORD schemas, got: " + schema.getType());
+      }
+      if (!schema.getFields().equals(HoodieSchema.Blob.BLOB_FIELDS)) {
+        throw new IllegalArgumentException("Blob logical type cannot be 
applied to schema: " + schema);
+      }
+    }

Review Comment:
   💬 **SUGGESTION**: 
   ```java
   if (!schema.getFields().equals(HoodieSchema.Blob.BLOB_FIELDS)) {
   ```
   
   Here we are relying on order and field equals() from avro. Should we 
explictly compare the individual fields like we did for Spark/Flink converters? 



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -941,6 +963,86 @@ public HoodieSchema getNonNullType() {
     return HoodieSchema.createUnion(nonNullTypes);
   }
 
+  boolean containsBlobType() {
+    if (getType() == HoodieSchemaType.BLOB) {
+      return true;
+    } else if (getType() == HoodieSchemaType.ARRAY) {
+      return getElementType().containsBlobType();
+    } else if (getType() == HoodieSchemaType.MAP) {
+      return getValueType().containsBlobType();
+    } else if (getType() == HoodieSchemaType.UNION) {
+      return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+    } else if (hasFields()) {
+      return getFields().stream().anyMatch(field -> 
field.schema().containsBlobType());
+    }
+    return false;
+  }
+
+  /**
+   * A convenience method to check if the current field represents a blob type.
+   * This checks if the current schema is a BLOB or if it is an ARRAY or MAP 
whose element or value type is a BLOB, respectively.
+   * It does not check for BLOB types nested within unions or record fields.
+   * @return true if the current schema is a BLOB or an ARRAY/MAP of BLOBs, 
false otherwise
+   */
+  public boolean isBlobField() {
+    HoodieSchema nonNullSchema = getNonNullType();
+    HoodieSchemaType nonNullSchemaType = nonNullSchema.getType();
+    return nonNullSchemaType == HoodieSchemaType.BLOB
+        || (nonNullSchemaType == HoodieSchemaType.ARRAY && 
nonNullSchema.getElementType().getNonNullType().getType() == 
HoodieSchemaType.BLOB)
+        || (nonNullSchemaType == HoodieSchemaType.MAP && 
nonNullSchema.getValueType().getNonNullType().getType() == 
HoodieSchemaType.BLOB);
+  }
+
+  /**
+   * Validates that the schema does not contain variants with shredded blob 
types.
+   * This method recursively traverses the schema tree to check for invalid 
structures.
+   *
+   * @param schema the schema to validate
+   * @throws HoodieSchemaException if the schema contains arrays or maps with 
blob types
+   */
+  private static void validateNoBlobsInVariant(HoodieSchema schema) {
+    if (schema == null) {
+      return;
+    }
+
+    HoodieSchemaType type = schema.getType();
+
+    switch (type) {
+      case ARRAY:
+        HoodieSchema elementType = schema.getElementType();
+        validateNoBlobsInVariant(elementType);
+        break;
+      case MAP:
+        HoodieSchema valueType = schema.getValueType();
+        validateNoBlobsInVariant(valueType);
+        break;
+      case VARIANT:
+        HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) schema;
+        variantSchema.getTypedValueField().ifPresent(typedValueField -> {
+          if (typedValueField.getNonNullType().containsBlobType()) {
+            throw new HoodieSchemaException("Variant typed_value field cannot 
be or contain a BLOB type");
+          }
+        });
+        break;
+      case RECORD:
+        // Validate all record fields
+        List<HoodieSchemaField> fields = schema.getFields();
+        for (HoodieSchemaField field : fields) {
+          validateNoBlobsInVariant(field.schema());
+        }
+        break;
+      case UNION:
+        // Validate all union types
+        List<HoodieSchema> types = schema.getTypes();
+        for (HoodieSchema unionType : types) {
+          validateNoBlobsInVariant(unionType);
+        }
+        break;
+      // For primitives, BLOB, ENUM, FIXED, NULL - no nested validation needed
+      default:
+        break;
+    }
+  }

Review Comment:
   💅 **NIT: `validateNoBlobsInVariant` is called on every `fromAvroSchema` 
call**
   
   This adds a recursive traversal on every schema parse. For schemas without 
variants this traversal is wasted work. Since this validation only matters for 
variant+blob combinations, it could be gated on whether the schema actually 
contains a variant or piggybacked on any other traversal already done on this 
nature? Performance impact is negligible for smaller/common schemas, but worth 
thinking for large schemas with many nested types.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2047,14 +2047,15 @@ private static boolean 
isColumnTypeSupportedV1(HoodieSchema schema, Option<Hoodi
     // HUDI-8585 will add support for BYTES and FIXED
     return type != HoodieSchemaType.RECORD && type != HoodieSchemaType.ARRAY 
&& type != HoodieSchemaType.MAP
         && type != HoodieSchemaType.ENUM && type != HoodieSchemaType.BYTES && 
type != HoodieSchemaType.FIXED
-        && type != HoodieSchemaType.DECIMAL; // DECIMAL's underlying type is 
BYTES
+        && type != HoodieSchemaType.BLOB && type != HoodieSchemaType.DECIMAL; 
// DECIMAL's underlying type is BYTES

Review Comment:
   💅 **NIT** fix or remove comment



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala:
##########
@@ -18,12 +18,15 @@
 package org.apache.spark.sql.avro
 
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats
-import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField, 
HoodieSchemaType}
 
 import org.apache.avro.JsonProperties
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField, 
StructType}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
 
+import java.util
+
 class TestSchemaConverters {

Review Comment:
   We should home those tests in right places. Need not happen in this PR per 
se. 



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -630,6 +648,10 @@ public static HoodieSchema.Variant 
createVariantShredded(String name, String nam
     return new HoodieSchema.Variant(recordSchema);
   }
 
+  public static HoodieSchema.Blob createBlob() {
+    return new HoodieSchema.Blob(Blob.DEFAULT_NAME);
+  }

Review Comment:
   ⚠️ **IMPORTANT** 
   Does using the DEFAULT_NAME work when there are multiple BLOB fields in the 
schema.? or do we need a way to pass in the name as we do for variant/decimal



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -941,6 +963,86 @@ public HoodieSchema getNonNullType() {
     return HoodieSchema.createUnion(nonNullTypes);
   }
 
+  boolean containsBlobType() {
+    if (getType() == HoodieSchemaType.BLOB) {
+      return true;
+    } else if (getType() == HoodieSchemaType.ARRAY) {
+      return getElementType().containsBlobType();
+    } else if (getType() == HoodieSchemaType.MAP) {
+      return getValueType().containsBlobType();
+    } else if (getType() == HoodieSchemaType.UNION) {
+      return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+    } else if (hasFields()) {
+      return getFields().stream().anyMatch(field -> 
field.schema().containsBlobType());
+    }
+    return false;
+  }
+
+  /**
+   * A convenience method to check if the current field represents a blob type.
+   * This checks if the current schema is a BLOB or if it is an ARRAY or MAP 
whose element or value type is a BLOB, respectively.
+   * It does not check for BLOB types nested within unions or record fields.
+   * @return true if the current schema is a BLOB or an ARRAY/MAP of BLOBs, 
false otherwise
+   */
+  public boolean isBlobField() {
+    HoodieSchema nonNullSchema = getNonNullType();
+    HoodieSchemaType nonNullSchemaType = nonNullSchema.getType();
+    return nonNullSchemaType == HoodieSchemaType.BLOB
+        || (nonNullSchemaType == HoodieSchemaType.ARRAY && 
nonNullSchema.getElementType().getNonNullType().getType() == 
HoodieSchemaType.BLOB)
+        || (nonNullSchemaType == HoodieSchemaType.MAP && 
nonNullSchema.getValueType().getNonNullType().getType() == 
HoodieSchemaType.BLOB);
+  }
+
+  /**
+   * Validates that the schema does not contain variants with shredded blob 
types.
+   * This method recursively traverses the schema tree to check for invalid 
structures.
+   *
+   * @param schema the schema to validate
+   * @throws HoodieSchemaException if the schema contains arrays or maps with 
blob types
+   */
+  private static void validateNoBlobsInVariant(HoodieSchema schema) {
+    if (schema == null) {
+      return;
+    }
+
+    HoodieSchemaType type = schema.getType();
+
+    switch (type) {
+      case ARRAY:
+        HoodieSchema elementType = schema.getElementType();
+        validateNoBlobsInVariant(elementType);
+        break;
+      case MAP:
+        HoodieSchema valueType = schema.getValueType();
+        validateNoBlobsInVariant(valueType);
+        break;
+      case VARIANT:
+        HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) schema;
+        variantSchema.getTypedValueField().ifPresent(typedValueField -> {
+          if (typedValueField.getNonNullType().containsBlobType()) {
+            throw new HoodieSchemaException("Variant typed_value field cannot 
be or contain a BLOB type");
+          }
+        });
+        break;
+      case RECORD:
+        // Validate all record fields
+        List<HoodieSchemaField> fields = schema.getFields();

Review Comment:
   💅 **NIT** 
   
   Can we do 1 line lambdas like `schema.getFields().forEach(field -> 
validateNoBlobsInVariant(field.schema()));` . 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to