Copilot commented on code in PR #18108:
URL: https://github.com/apache/hudi/pull/18108#discussion_r2805071988


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -79,20 +80,24 @@ object HoodieSparkSchemaConverters {
 
       // Complex types
       case ArrayType(elementType, containsNull) =>
-        val elementSchema = toHoodieType(elementType, containsNull, 
recordName, nameSpace)
+        val elementSchema = toHoodieType(elementType, containsNull, 
recordName, nameSpace, metadata)
         HoodieSchema.createArray(elementSchema)
 
       case MapType(StringType, valueType, valueContainsNull) =>
-        val valueSchema = toHoodieType(valueType, valueContainsNull, 
recordName, nameSpace)
+        val valueSchema = toHoodieType(valueType, valueContainsNull, 
recordName, nameSpace, metadata)
         HoodieSchema.createMap(valueSchema)
 
+      case _: StructType if 
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
+        
metadata.getString(HoodieSchema.TYPE_METADATA_FIELD).equalsIgnoreCase(HoodieSchemaType.BLOB.name())
 =>
+        // Handle blob struct type
+        HoodieSchema.createBlob()

Review Comment:
   Blob detection during Spark -> Hudi conversion is based solely on 
`StructField` metadata (`hudi_type=BLOB`) and does not validate that the 
`StructType` actually matches the expected blob structure 
(type/data/reference...). This can silently accept incompatible schemas and 
reconstruct them as `HoodieSchema.createBlob()`. Consider validating the 
struct’s fields/types before returning a BLOB schema (or failing fast with an 
informative error).



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java:
##########
@@ -156,6 +161,8 @@ public static HoodieSchemaType fromAvro(Schema avroSchema) {
         return UUID;
       } else if (logicalType instanceof VariantLogicalType) {
         return VARIANT;
+      } else if (logicalType == HoodieSchema.BlobLogicalType.blob()) {

Review Comment:
   `fromAvro` checks blob logical type using reference equality (`logicalType 
== HoodieSchema.BlobLogicalType.blob()`). This can fail if an Avro schema with 
`logicalType:"blob"` was parsed before the custom logical type was registered, 
since Avro may create a different LogicalType instance. Use a more robust check 
(e.g., `instanceof BlobLogicalType` or name-based comparison) so blob detection 
doesn’t depend on initialization/parse order.
   ```suggestion
         } else if (logicalType instanceof HoodieSchema.BlobLogicalType) {
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -152,20 +159,28 @@ public static HoodieSchema fromAvroSchema(Schema 
avroSchema) {
     if (avroSchema == null) {
       return null;
     }
+    HoodieSchema schema;
     LogicalType logicalType = avroSchema.getLogicalType();
     if (logicalType != null) {
       if (logicalType instanceof LogicalTypes.Decimal) {
-        return new HoodieSchema.Decimal(avroSchema);
+        schema = new HoodieSchema.Decimal(avroSchema);
       } else if (logicalType instanceof LogicalTypes.TimeMillis || logicalType 
instanceof LogicalTypes.TimeMicros) {
-        return new HoodieSchema.Time(avroSchema);
+        schema = new HoodieSchema.Time(avroSchema);
       } else if (logicalType instanceof LogicalTypes.TimestampMillis || 
logicalType instanceof LogicalTypes.TimestampMicros
           || logicalType instanceof LogicalTypes.LocalTimestampMillis || 
logicalType instanceof LogicalTypes.LocalTimestampMicros) {
-        return new HoodieSchema.Timestamp(avroSchema);
+        schema = new HoodieSchema.Timestamp(avroSchema);
       } else if (logicalType == VariantLogicalType.variant()) {
-        return new HoodieSchema.Variant(avroSchema);
+        schema = new HoodieSchema.Variant(avroSchema);
+      } else if (logicalType == BlobLogicalType.blob()) {

Review Comment:
   `fromAvroSchema` uses reference equality (`logicalType == 
BlobLogicalType.blob()`) to detect the blob logical type. If the Avro schema 
was parsed before `LogicalTypes.register("blob", ...)` ran, 
`avroSchema.getLogicalType()` may not be the singleton instance and blob 
detection will fail. Prefer `instanceof BlobLogicalType` or checking 
`logicalType.getName()` to make detection independent of registration/parse 
ordering.
   ```suggestion
         } else if (logicalType instanceof BlobLogicalType) {
   ```



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -54,7 +54,8 @@ object HoodieSparkSchemaConverters {
   def toHoodieType(catalystType: DataType,
                    nullable: Boolean = false,
                    recordName: String = "topLevelRecord",
-                   nameSpace: String = ""): HoodieSchema = {
+                   nameSpace: String = "",
+                   metadata: Metadata = Metadata.empty): HoodieSchema = {

Review Comment:
   `toHoodieType` gained a new `metadata` parameter, which is a 
binary-incompatible API change for any already-compiled callers (Scala/Java) 
expecting the previous 4-arg signature. Consider adding an overload with the 
old signature delegating to the new method (passing `Metadata.empty`) to 
preserve binary compatibility while still enabling blob metadata handling.



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -941,6 +964,64 @@ public HoodieSchema getNonNullType() {
     return HoodieSchema.createUnion(nonNullTypes);
   }
 
+  boolean containsBlobType() {
+    if (getType() == HoodieSchemaType.BLOB) {
+      return true;
+    } else if (getType() == HoodieSchemaType.UNION) {
+      return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+    } else if (getType() == HoodieSchemaType.RECORD) {

Review Comment:
   `containsBlobType` only descends into `UNION` and `RECORD`, but not 
`VARIANT` even though `VARIANT` can contain nested schemas (e.g., shredded 
variant `typed_value`). This means blob types nested under VARIANT won’t be 
detected, allowing arrays/maps that indirectly contain blobs to bypass the new 
validations. Consider traversing any schema that `hasFields()` 
(RECORD/VARIANT/BLOB) instead of only RECORD, and update 
`validateNoBlobsInArrayOrMap` accordingly.
   ```suggestion
       } else if (hasFields()) {
   ```



##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java:
##########
@@ -1689,4 +1690,221 @@ public void testVariantLogicalTypeSingleton() {
     // Verify the logical type name
     assertEquals("variant", instance1.getName());
   }
+
+  // ==================== Blob Type Tests ====================
+
+  @Test
+  public void testCreateBlob() {
+    HoodieSchema.Blob blob = HoodieSchema.createBlob();
+
+    assertNotNull(blob);
+    assertEquals("blob", blob.getName());
+    assertEquals(HoodieSchemaType.BLOB, blob.getType());
+
+    // Verify storage_type field
+    Option<HoodieSchemaField> storageTypeOpt = blob.getField("type");
+    assertTrue(storageTypeOpt.isPresent());
+    HoodieSchemaField storageTypeField = storageTypeOpt.get();
+    assertEquals(HoodieSchemaType.ENUM, storageTypeField.schema().getType());
+    assertEquals(Arrays.asList("INLINE", "OUT_OF_LINE"), 
storageTypeField.schema().getEnumSymbols());
+    assertFalse(storageTypeField.schema().isNullable());
+
+    // Verify data field is nullable
+    Option<HoodieSchemaField> dataOpt = blob.getField("data");
+    assertTrue(dataOpt.isPresent());
+    HoodieSchemaField dataField = dataOpt.get();
+    assertTrue(dataField.schema().isNullable());
+    assertEquals(HoodieSchemaType.BYTES, 
dataField.schema().getNonNullType().getType());
+
+    // Verify reference field is nullable
+    Option<HoodieSchemaField> refOpt = blob.getField("reference");
+    assertTrue(refOpt.isPresent());
+    HoodieSchemaField refField = refOpt.get();
+    assertTrue(refField.schema().isNullable());
+    assertEquals(HoodieSchemaType.RECORD, 
refField.schema().getNonNullType().getType());
+
+    HoodieSchema refSchema = refOpt.get().schema().getNonNullType();
+    assertEquals(HoodieSchemaType.RECORD, refSchema.getType());
+
+    // Verify reference has all required fields
+    Option<HoodieSchemaField> externalPathOpt = 
refSchema.getField("external_path");
+    assertTrue(externalPathOpt.isPresent());
+    assertEquals(HoodieSchemaType.STRING, 
externalPathOpt.get().schema().getType());
+    assertFalse(externalPathOpt.get().schema().isNullable());
+
+    Option<HoodieSchemaField> offsetOpt = refSchema.getField("offset");
+    assertTrue(offsetOpt.isPresent());
+    assertEquals(HoodieSchemaType.LONG, 
offsetOpt.get().schema().getNonNullType().getType());
+    assertTrue(offsetOpt.get().schema().isNullable());
+
+    Option<HoodieSchemaField> lengthOpt = refSchema.getField("length");
+    assertTrue(lengthOpt.isPresent());
+    assertEquals(HoodieSchemaType.LONG, 
lengthOpt.get().schema().getNonNullType().getType());
+    assertTrue(lengthOpt.get().schema().isNullable());
+
+    Option<HoodieSchemaField> managedOpt = refSchema.getField("managed");
+    assertTrue(managedOpt.isPresent());
+    assertEquals(HoodieSchemaType.BOOLEAN, 
managedOpt.get().schema().getType());
+    assertFalse(managedOpt.get().schema().isNullable());
+  }
+
+  @Test
+  public void testBlobLogicalTypeDetection() {
+    HoodieSchema.Blob blob = HoodieSchema.createBlob();
+
+    // Verify logical type is set
+    Schema avroSchema = blob.toAvroSchema();
+    assertNotNull(avroSchema.getLogicalType());
+    assertEquals("blob", avroSchema.getLogicalType().getName());
+
+    // Verify it can be detected back
+    HoodieSchema reconstructed = HoodieSchema.fromAvroSchema(avroSchema);
+    assertInstanceOf(HoodieSchema.Blob.class, reconstructed);
+    assertEquals(HoodieSchemaType.BLOB, reconstructed.getType());
+  }
+
+  @Test
+  public void testBlobAsRecordField() {
+    HoodieSchema recordSchema = HoodieSchema.createRecord("test_record", null, 
null, Arrays.asList(
+        HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+        HoodieSchemaField.of("file_data", HoodieSchema.createBlob())
+    ));
+
+    Option<HoodieSchemaField> blobFieldOpt = 
recordSchema.getField("file_data");
+    assertTrue(blobFieldOpt.isPresent());
+    HoodieSchemaField blobField = blobFieldOpt.get();
+    assertEquals(HoodieSchemaType.BLOB, blobField.schema().getType());
+
+    // Verify the blob field has proper structure
+    HoodieSchema blobSchema = blobField.schema();
+    assertInstanceOf(HoodieSchema.Blob.class, blobSchema);
+
+    // Validate the blob schema can be converted to string and back without 
losing the logical type
+    String recordJson = recordSchema.toString();
+    HoodieSchema parsedRecord = HoodieSchema.parse(recordJson);
+    Option<HoodieSchemaField> parsedBlobFieldOpt = 
parsedRecord.getField("file_data");
+    assertTrue(parsedBlobFieldOpt.isPresent());
+    HoodieSchemaField parsedBlobField = parsedBlobFieldOpt.get();
+    assertInstanceOf(HoodieSchema.Blob.class, parsedBlobField.schema());
+    assertEquals(HoodieSchemaType.BLOB, parsedBlobField.schema().getType());
+  }
+
+  private static final String BLOB_JSON = "{"
+      + "\"type\":\"record\","
+      + "\"name\":\"blob\","
+      + "\"logicalType\":\"blob\","
+      + "\"fields\":["
+      + "  {\"name\":\"storage_type\",\"type\":\"string\"},"
+      + "  {\"name\":\"data\",\"type\":[\"null\",\"bytes\"],\"default\":null},"
+      + "  
{\"name\":\"reference\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"reference\",\"fields\":["
+      + "    {\"name\":\"external_path\",\"type\":\"string\"},"
+      + "    {\"name\":\"offset\",\"type\":\"long\"},"
+      + "    {\"name\":\"length\",\"type\":\"long\"},"
+      + "    {\"name\":\"managed\",\"type\":\"boolean\"}"
+      + "  ]}],\"default\":null}"
+      + "]"
+      + "}";

Review Comment:
   `BLOB_JSON` does not match the blob schema produced by 
`HoodieSchema.createBlob()`: it uses field name `storage_type` with type 
`string`, while `createBlob()` defines field `HoodieSchema.Blob.TYPE` 
(currently `"type"`) as an enum with symbols `[INLINE, OUT_OF_LINE]`. As 
written, the parse test can succeed while exercising a structurally-invalid 
blob schema, which weakens the validation coverage. Consider updating 
`BLOB_JSON` to mirror `createBlob()` (field names + enum vs string) and/or 
adding schema-structure validation in `HoodieSchema.fromAvroSchema` for blobs.
   ```suggestion
     private static final String BLOB_JSON = 
HoodieSchema.createBlob().toString();
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1883,6 +1968,107 @@ public int hashCode() {
     }
   }
 
+  static class BlobLogicalType extends LogicalType {
+
+    private static final String BLOB_LOGICAL_TYPE_NAME = "blob";
+    // Eager initialization of singleton
+    private static final BlobLogicalType INSTANCE = new BlobLogicalType();
+
+    private BlobLogicalType() {
+      super(BlobLogicalType.BLOB_LOGICAL_TYPE_NAME);
+    }
+
+    public static BlobLogicalType blob() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getType() != Schema.Type.RECORD) {
+        throw new IllegalArgumentException("Blob logical type can only be 
applied to RECORD schemas, got: " + schema.getType());
+      }
+    }
+  }
+
+  /**
+   * Factory for creating VariantLogicalType instances.
+   */
+  private static class BlogLogicalTypeFactory implements 
LogicalTypes.LogicalTypeFactory {
+    @Override
+    public LogicalType fromSchema(Schema schema) {
+      return BlobLogicalType.blob();
+    }
+
+    @Override
+    public String getTypeName() {
+      return BlobLogicalType.BLOB_LOGICAL_TYPE_NAME;
+    }

Review Comment:
   There are a couple of blob logical-type naming/doc typos here: the factory 
class is named `BlogLogicalTypeFactory` (looks like it should be 
`BlobLogicalTypeFactory`), and its Javadoc says it creates `VariantLogicalType` 
instances even though it’s for blob. Renaming/fixing the Javadoc will make the 
new logical type easier to understand and avoid confusion later.



##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaType.java:
##########
@@ -84,6 +84,8 @@ public void testComplexTypes() {
     assertTrue(HoodieSchemaType.ARRAY.isComplex(), "ARRAY should be complex");
     assertTrue(HoodieSchemaType.MAP.isComplex(), "MAP should be complex");
     assertTrue(HoodieSchemaType.UNION.isComplex(), "UNION should be complex");
+    assertTrue(HoodieSchemaType.VARIANT.isComplex(), "VARIANT should be 
complex");
+    assertTrue(HoodieSchemaType.BLOB.isComplex(), "TIMESTAMP should be 
complex");

Review Comment:
   The assertion message refers to TIMESTAMP, but the assertion is about BLOB 
being complex. This looks like a copy/paste typo and makes failures harder to 
interpret.
   ```suggestion
       assertTrue(HoodieSchemaType.BLOB.isComplex(), "BLOB should be complex");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to