Copilot commented on code in PR #18108:
URL: https://github.com/apache/hudi/pull/18108#discussion_r2805071988
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -79,20 +80,24 @@ object HoodieSparkSchemaConverters {
// Complex types
case ArrayType(elementType, containsNull) =>
- val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace)
+ val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace, metadata)
HoodieSchema.createArray(elementSchema)
case MapType(StringType, valueType, valueContainsNull) =>
- val valueSchema = toHoodieType(valueType, valueContainsNull,
recordName, nameSpace)
+ val valueSchema = toHoodieType(valueType, valueContainsNull,
recordName, nameSpace, metadata)
HoodieSchema.createMap(valueSchema)
+ case _: StructType if
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
+
metadata.getString(HoodieSchema.TYPE_METADATA_FIELD).equalsIgnoreCase(HoodieSchemaType.BLOB.name())
=>
+ // Handle blob struct type
+ HoodieSchema.createBlob()
Review Comment:
Blob detection during Spark -> Hudi conversion is based solely on
`StructField` metadata (`hudi_type=BLOB`) and does not validate that the
`StructType` actually matches the expected blob structure
(type/data/reference...). This can silently accept incompatible schemas and
reconstruct them as `HoodieSchema.createBlob()`. Consider validating the
struct’s fields/types before returning a BLOB schema (or failing fast with an
informative error).
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java:
##########
@@ -156,6 +161,8 @@ public static HoodieSchemaType fromAvro(Schema avroSchema) {
return UUID;
} else if (logicalType instanceof VariantLogicalType) {
return VARIANT;
+ } else if (logicalType == HoodieSchema.BlobLogicalType.blob()) {
Review Comment:
`fromAvro` checks blob logical type using reference equality (`logicalType
== HoodieSchema.BlobLogicalType.blob()`). This can fail if an Avro schema with
`logicalType:"blob"` was parsed before the custom logical type was registered,
since Avro may create a different LogicalType instance. Use a more robust check
(e.g., `instanceof BlobLogicalType` or name-based comparison) so blob detection
doesn’t depend on initialization/parse order.
```suggestion
} else if (logicalType instanceof HoodieSchema.BlobLogicalType) {
```
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -152,20 +159,28 @@ public static HoodieSchema fromAvroSchema(Schema
avroSchema) {
if (avroSchema == null) {
return null;
}
+ HoodieSchema schema;
LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType != null) {
if (logicalType instanceof LogicalTypes.Decimal) {
- return new HoodieSchema.Decimal(avroSchema);
+ schema = new HoodieSchema.Decimal(avroSchema);
} else if (logicalType instanceof LogicalTypes.TimeMillis || logicalType
instanceof LogicalTypes.TimeMicros) {
- return new HoodieSchema.Time(avroSchema);
+ schema = new HoodieSchema.Time(avroSchema);
} else if (logicalType instanceof LogicalTypes.TimestampMillis ||
logicalType instanceof LogicalTypes.TimestampMicros
|| logicalType instanceof LogicalTypes.LocalTimestampMillis ||
logicalType instanceof LogicalTypes.LocalTimestampMicros) {
- return new HoodieSchema.Timestamp(avroSchema);
+ schema = new HoodieSchema.Timestamp(avroSchema);
} else if (logicalType == VariantLogicalType.variant()) {
- return new HoodieSchema.Variant(avroSchema);
+ schema = new HoodieSchema.Variant(avroSchema);
+ } else if (logicalType == BlobLogicalType.blob()) {
Review Comment:
`fromAvroSchema` uses reference equality (`logicalType ==
BlobLogicalType.blob()`) to detect the blob logical type. If the Avro schema
was parsed before `LogicalTypes.register("blob", ...)` ran,
`avroSchema.getLogicalType()` may not be the singleton instance and blob
detection will fail. Prefer `instanceof BlobLogicalType` or checking
`logicalType.getName()` to make detection independent of registration/parse
ordering.
```suggestion
} else if (logicalType instanceof BlobLogicalType) {
```
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -54,7 +54,8 @@ object HoodieSparkSchemaConverters {
def toHoodieType(catalystType: DataType,
nullable: Boolean = false,
recordName: String = "topLevelRecord",
- nameSpace: String = ""): HoodieSchema = {
+ nameSpace: String = "",
+ metadata: Metadata = Metadata.empty): HoodieSchema = {
Review Comment:
`toHoodieType` gained a new `metadata` parameter, which is a
binary-incompatible API change for any already-compiled callers (Scala/Java)
expecting the previous 4-arg signature. Consider adding an overload with the
old signature delegating to the new method (passing `Metadata.empty`) to
preserve binary compatibility while still enabling blob metadata handling.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -941,6 +964,64 @@ public HoodieSchema getNonNullType() {
return HoodieSchema.createUnion(nonNullTypes);
}
+ boolean containsBlobType() {
+ if (getType() == HoodieSchemaType.BLOB) {
+ return true;
+ } else if (getType() == HoodieSchemaType.UNION) {
+ return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+ } else if (getType() == HoodieSchemaType.RECORD) {
Review Comment:
`containsBlobType` only descends into `UNION` and `RECORD`, but not
`VARIANT` even though `VARIANT` can contain nested schemas (e.g., shredded
variant `typed_value`). This means blob types nested under VARIANT won’t be
detected, allowing arrays/maps that indirectly contain blobs to bypass the new
validations. Consider traversing any schema that `hasFields()`
(RECORD/VARIANT/BLOB) instead of only RECORD, and update
`validateNoBlobsInArrayOrMap` accordingly.
```suggestion
} else if (hasFields()) {
```
##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java:
##########
@@ -1689,4 +1690,221 @@ public void testVariantLogicalTypeSingleton() {
// Verify the logical type name
assertEquals("variant", instance1.getName());
}
+
+ // ==================== Blob Type Tests ====================
+
+ @Test
+ public void testCreateBlob() {
+ HoodieSchema.Blob blob = HoodieSchema.createBlob();
+
+ assertNotNull(blob);
+ assertEquals("blob", blob.getName());
+ assertEquals(HoodieSchemaType.BLOB, blob.getType());
+
+ // Verify storage_type field
+ Option<HoodieSchemaField> storageTypeOpt = blob.getField("type");
+ assertTrue(storageTypeOpt.isPresent());
+ HoodieSchemaField storageTypeField = storageTypeOpt.get();
+ assertEquals(HoodieSchemaType.ENUM, storageTypeField.schema().getType());
+ assertEquals(Arrays.asList("INLINE", "OUT_OF_LINE"),
storageTypeField.schema().getEnumSymbols());
+ assertFalse(storageTypeField.schema().isNullable());
+
+ // Verify data field is nullable
+ Option<HoodieSchemaField> dataOpt = blob.getField("data");
+ assertTrue(dataOpt.isPresent());
+ HoodieSchemaField dataField = dataOpt.get();
+ assertTrue(dataField.schema().isNullable());
+ assertEquals(HoodieSchemaType.BYTES,
dataField.schema().getNonNullType().getType());
+
+ // Verify reference field is nullable
+ Option<HoodieSchemaField> refOpt = blob.getField("reference");
+ assertTrue(refOpt.isPresent());
+ HoodieSchemaField refField = refOpt.get();
+ assertTrue(refField.schema().isNullable());
+ assertEquals(HoodieSchemaType.RECORD,
refField.schema().getNonNullType().getType());
+
+ HoodieSchema refSchema = refOpt.get().schema().getNonNullType();
+ assertEquals(HoodieSchemaType.RECORD, refSchema.getType());
+
+ // Verify reference has all required fields
+ Option<HoodieSchemaField> externalPathOpt =
refSchema.getField("external_path");
+ assertTrue(externalPathOpt.isPresent());
+ assertEquals(HoodieSchemaType.STRING,
externalPathOpt.get().schema().getType());
+ assertFalse(externalPathOpt.get().schema().isNullable());
+
+ Option<HoodieSchemaField> offsetOpt = refSchema.getField("offset");
+ assertTrue(offsetOpt.isPresent());
+ assertEquals(HoodieSchemaType.LONG,
offsetOpt.get().schema().getNonNullType().getType());
+ assertTrue(offsetOpt.get().schema().isNullable());
+
+ Option<HoodieSchemaField> lengthOpt = refSchema.getField("length");
+ assertTrue(lengthOpt.isPresent());
+ assertEquals(HoodieSchemaType.LONG,
lengthOpt.get().schema().getNonNullType().getType());
+ assertTrue(lengthOpt.get().schema().isNullable());
+
+ Option<HoodieSchemaField> managedOpt = refSchema.getField("managed");
+ assertTrue(managedOpt.isPresent());
+ assertEquals(HoodieSchemaType.BOOLEAN,
managedOpt.get().schema().getType());
+ assertFalse(managedOpt.get().schema().isNullable());
+ }
+
+ @Test
+ public void testBlobLogicalTypeDetection() {
+ HoodieSchema.Blob blob = HoodieSchema.createBlob();
+
+ // Verify logical type is set
+ Schema avroSchema = blob.toAvroSchema();
+ assertNotNull(avroSchema.getLogicalType());
+ assertEquals("blob", avroSchema.getLogicalType().getName());
+
+ // Verify it can be detected back
+ HoodieSchema reconstructed = HoodieSchema.fromAvroSchema(avroSchema);
+ assertInstanceOf(HoodieSchema.Blob.class, reconstructed);
+ assertEquals(HoodieSchemaType.BLOB, reconstructed.getType());
+ }
+
+ @Test
+ public void testBlobAsRecordField() {
+ HoodieSchema recordSchema = HoodieSchema.createRecord("test_record", null,
null, Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("file_data", HoodieSchema.createBlob())
+ ));
+
+ Option<HoodieSchemaField> blobFieldOpt =
recordSchema.getField("file_data");
+ assertTrue(blobFieldOpt.isPresent());
+ HoodieSchemaField blobField = blobFieldOpt.get();
+ assertEquals(HoodieSchemaType.BLOB, blobField.schema().getType());
+
+ // Verify the blob field has proper structure
+ HoodieSchema blobSchema = blobField.schema();
+ assertInstanceOf(HoodieSchema.Blob.class, blobSchema);
+
+ // Validate the blob schema can be converted to string and back without
losing the logical type
+ String recordJson = recordSchema.toString();
+ HoodieSchema parsedRecord = HoodieSchema.parse(recordJson);
+ Option<HoodieSchemaField> parsedBlobFieldOpt =
parsedRecord.getField("file_data");
+ assertTrue(parsedBlobFieldOpt.isPresent());
+ HoodieSchemaField parsedBlobField = parsedBlobFieldOpt.get();
+ assertInstanceOf(HoodieSchema.Blob.class, parsedBlobField.schema());
+ assertEquals(HoodieSchemaType.BLOB, parsedBlobField.schema().getType());
+ }
+
+ private static final String BLOB_JSON = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"blob\","
+ + "\"logicalType\":\"blob\","
+ + "\"fields\":["
+ + " {\"name\":\"storage_type\",\"type\":\"string\"},"
+ + " {\"name\":\"data\",\"type\":[\"null\",\"bytes\"],\"default\":null},"
+ + "
{\"name\":\"reference\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"reference\",\"fields\":["
+ + " {\"name\":\"external_path\",\"type\":\"string\"},"
+ + " {\"name\":\"offset\",\"type\":\"long\"},"
+ + " {\"name\":\"length\",\"type\":\"long\"},"
+ + " {\"name\":\"managed\",\"type\":\"boolean\"}"
+ + " ]}],\"default\":null}"
+ + "]"
+ + "}";
Review Comment:
`BLOB_JSON` does not match the blob schema produced by
`HoodieSchema.createBlob()`: it uses field name `storage_type` with type
`string`, while `createBlob()` defines field `HoodieSchema.Blob.TYPE`
(currently `"type"`) as an enum with symbols `[INLINE, OUT_OF_LINE]`. As
written, the parse test can succeed while exercising a structurally-invalid
blob schema, which weakens the validation coverage. Consider updating
`BLOB_JSON` to mirror `createBlob()` (field names + enum vs string) and/or
adding schema-structure validation in `HoodieSchema.fromAvroSchema` for blobs.
```suggestion
private static final String BLOB_JSON =
HoodieSchema.createBlob().toString();
```
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1883,6 +1968,107 @@ public int hashCode() {
}
}
+ static class BlobLogicalType extends LogicalType {
+
+ private static final String BLOB_LOGICAL_TYPE_NAME = "blob";
+ // Eager initialization of singleton
+ private static final BlobLogicalType INSTANCE = new BlobLogicalType();
+
+ private BlobLogicalType() {
+ super(BlobLogicalType.BLOB_LOGICAL_TYPE_NAME);
+ }
+
+ public static BlobLogicalType blob() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ if (schema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException("Blob logical type can only be
applied to RECORD schemas, got: " + schema.getType());
+ }
+ }
+ }
+
+ /**
+ * Factory for creating VariantLogicalType instances.
+ */
+ private static class BlogLogicalTypeFactory implements
LogicalTypes.LogicalTypeFactory {
+ @Override
+ public LogicalType fromSchema(Schema schema) {
+ return BlobLogicalType.blob();
+ }
+
+ @Override
+ public String getTypeName() {
+ return BlobLogicalType.BLOB_LOGICAL_TYPE_NAME;
+ }
Review Comment:
There are a couple of blob logical-type naming/doc typos here: the factory
class is named `BlogLogicalTypeFactory` (looks like it should be
`BlobLogicalTypeFactory`), and its Javadoc says it creates `VariantLogicalType`
instances even though it’s for blob. Renaming/fixing the Javadoc will make the
new logical type easier to understand and avoid confusion later.
##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaType.java:
##########
@@ -84,6 +84,8 @@ public void testComplexTypes() {
assertTrue(HoodieSchemaType.ARRAY.isComplex(), "ARRAY should be complex");
assertTrue(HoodieSchemaType.MAP.isComplex(), "MAP should be complex");
assertTrue(HoodieSchemaType.UNION.isComplex(), "UNION should be complex");
+ assertTrue(HoodieSchemaType.VARIANT.isComplex(), "VARIANT should be
complex");
+ assertTrue(HoodieSchemaType.BLOB.isComplex(), "TIMESTAMP should be
complex");
Review Comment:
The assertion message refers to TIMESTAMP, but the assertion is about BLOB
being complex. This looks like a copy/paste typo and makes failures harder to
interpret.
```suggestion
assertTrue(HoodieSchemaType.BLOB.isComplex(), "BLOB should be complex");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]