This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9bb88cf935b0 feat(blob): Blob schema definition (#18108)
9bb88cf935b0 is described below
commit 9bb88cf935b0175f6b46325062da48b4acf8ed26
Author: Tim Brown <[email protected]>
AuthorDate: Fri Feb 20 06:57:24 2026 -0800
feat(blob): Blob schema definition (#18108)
* basic definition and converter
* initial testing
* update type handling
* fix handling of maps/arrays
* fix scalastyle
* fix nullability expectations, fix type handling
* fix hasFields for blob
* avoid col stats on blob
* add restrictions on blob column, update naming
* fix test setup
* update type name
* update test
* address feedback
* add struct validation on conversion
* add validation
* fix typo
* Revert "add restrictions on blob column, update naming"
This reverts commit 63e4383108466da4a1982cad1a13fd70db8f5145.
* fix map/array handling in spark, add nullable tests
* remove restriction from rfc
* update flink schema convert
* update schema evolution and compatibility checkers
* add back validation
* simplify test cases
* address comments
* separate types onto new lines
---
.../apache/hudi/util/HoodieSchemaConverter.java | 105 ++++++++++
.../hudi/util/TestHoodieSchemaConverter.java | 157 ++++++++++++++
.../hudi/io/storage/HoodieSparkParquetReader.java | 3 +-
.../sql/avro/HoodieSparkSchemaConverters.scala | 49 ++++-
.../apache/hudi/common/schema/HoodieSchema.java | 176 +++++++++++++++-
.../HoodieSchemaComparatorForSchemaEvolution.java | 5 +
.../schema/HoodieSchemaCompatibilityChecker.java | 3 +-
.../schema/HoodieSchemaProjectionChecker.java | 1 +
.../hudi/common/schema/HoodieSchemaType.java | 8 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 6 +-
.../hudi/common/schema/TestHoodieSchema.java | 225 +++++++++++++++++++++
...stHoodieSchemaComparatorForSchemaEvolution.java | 109 ++++++++++
.../schema/TestHoodieSchemaCompatibility.java | 89 ++++++++
.../hudi/common/schema/TestHoodieSchemaType.java | 5 +
.../spark/sql/avro/TestSchemaConverters.scala | 162 ++++++++++++++-
.../org/apache/hudi/hive/TestSparkSchemaUtils.java | 40 ++++
.../hudi/sync/common/util/SparkSchemaUtils.java | 14 +-
.../deltastreamer/TestSourceFormatAdapter.java | 3 +-
rfc/rfc-100/rfc-100.md | 4 -
19 files changed, 1139 insertions(+), 25 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index 5e211c4851a0..d53c6fed1b1c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
@@ -41,6 +42,7 @@ import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -175,6 +177,13 @@ public class HoodieSchemaConverter {
case ROW:
RowType rowType = (RowType) logicalType;
+
+ // Check if this is a BLOB structure
+ if (isBlobStructure(rowType)) {
+ schema = HoodieSchema.createBlob();
+ break;
+ }
+
List<String> fieldNames = rowType.getFieldNames();
List<HoodieSchemaField> hoodieFields = new ArrayList<>();
@@ -252,6 +261,82 @@ public class HoodieSchemaConverter {
return logicalType.getTypeRoot().getFamilies().contains(family);
}
+ /**
+ * Detects if a Flink RowType represents a BLOB structure by validating it
matches the schema defined in {@link HoodieSchema.Blob}.
+ */
+ private static boolean isBlobStructure(RowType rowType) {
+ // Validate: 3 fields with exact names
+ if (rowType.getFieldCount() != HoodieSchema.Blob.getFieldCount()) {
+ return false;
+ }
+ List<String> fieldNames = rowType.getFieldNames();
+ if (!fieldNames.equals(Arrays.asList(
+ HoodieSchema.Blob.TYPE,
+ HoodieSchema.Blob.INLINE_DATA_FIELD,
+ HoodieSchema.Blob.EXTERNAL_REFERENCE))) {
+ return false;
+ }
+
+ // Validate 'type' field: non-null STRING
+ LogicalType typeField = rowType.getTypeAt(0);
+ if (!isFamily(typeField, LogicalTypeFamily.CHARACTER_STRING) ||
typeField.isNullable()) {
+ return false;
+ }
+
+ // Validate 'data' field: nullable BYTES/VARBINARY
+ LogicalType dataField = rowType.getTypeAt(1);
+ if (dataField.getTypeRoot() != LogicalTypeRoot.BINARY &&
dataField.getTypeRoot() != LogicalTypeRoot.VARBINARY) {
+ return false;
+ }
+ if (!dataField.isNullable()) {
+ return false;
+ }
+
+ // Validate 'reference' field: nullable ROW
+ LogicalType referenceField = rowType.getTypeAt(2);
+ if (!referenceField.isNullable() || referenceField.getTypeRoot() !=
LogicalTypeRoot.ROW) {
+ return false;
+ }
+
+ // Validate reference sub-structure
+ RowType referenceRow = (RowType) referenceField;
+ if (referenceRow.getFieldCount() !=
HoodieSchema.Blob.getReferenceFieldCount()) {
+ return false;
+ }
+ List<String> refFieldNames = referenceRow.getFieldNames();
+ if (!refFieldNames.equals(Arrays.asList(
+ HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
+ HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
+ HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
+ HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED))) {
+ return false;
+ }
+
+ // Validate reference sub-field types
+ // external_path: non-null STRING
+ if (!isFamily(referenceRow.getTypeAt(0),
LogicalTypeFamily.CHARACTER_STRING)
+ || referenceRow.getTypeAt(0).isNullable()) {
+ return false;
+ }
+ // offset: nullable BIGINT
+ if (referenceRow.getTypeAt(1).getTypeRoot() != LogicalTypeRoot.BIGINT
+ || !referenceRow.getTypeAt(1).isNullable()) {
+ return false;
+ }
+ // length: nullable BIGINT
+ if (referenceRow.getTypeAt(2).getTypeRoot() != LogicalTypeRoot.BIGINT
+ || !referenceRow.getTypeAt(2).isNullable()) {
+ return false;
+ }
+ // managed: non-null BOOLEAN
+ if (referenceRow.getTypeAt(3).getTypeRoot() != LogicalTypeRoot.BOOLEAN
+ || referenceRow.getTypeAt(3).isNullable()) {
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Computes minimum bytes needed for decimal precision.
* This ensures compatibility with Avro fixed-size decimal representation.
@@ -316,6 +401,8 @@ public class HoodieSchemaConverter {
return convertTimestamp(hoodieSchema);
case UUID:
return DataTypes.STRING().notNull();
+ case BLOB:
+ return createBlob();
case ARRAY:
return convertArray(hoodieSchema);
case MAP:
@@ -380,6 +467,24 @@ public class HoodieSchemaConverter {
return DataTypes.TIME(flinkPrecision).notNull();
}
+ private static DataType createBlob() {
+ // Create nested reference ROW type
+ DataType referenceType = DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
DataTypes.BOOLEAN().notNull())
+ ).nullable();
+
+ // Create top-level BLOB ROW type
+ // Note: 'type' field is ENUM, converted to STRING in Flink
+ return DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD,
DataTypes.BYTES().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, referenceType)
+ ).notNull();
+ }
+
private static DataType convertRecord(HoodieSchema schema) {
List<HoodieSchemaField> fields = schema.getFields();
DataTypes.Field[] flinkFields = new DataTypes.Field[fields.size()];
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index a6fc063857eb..7c75bebc123f 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -530,4 +531,160 @@ public class TestHoodieSchemaConverter {
+ "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
assertEquals(expectedDataType, convertedDataType.toString());
}
+
+ @Test
+ public void testBlobTypeRoundTrip() {
+ // Create a BLOB HoodieSchema
+ HoodieSchema blobSchema = HoodieSchema.createBlob();
+ assertEquals(HoodieSchemaType.BLOB, blobSchema.getType());
+
+ // Convert to Flink DataType
+ DataType dataType = HoodieSchemaConverter.convertToDataType(blobSchema);
+ assertNotNull(dataType);
+
+ // Verify it's a ROW structure with correct fields
+ RowType rowType = (RowType) dataType.getLogicalType();
+ assertEquals(3, rowType.getFieldCount());
+ assertEquals(Arrays.asList(
+ HoodieSchema.Blob.TYPE,
+ HoodieSchema.Blob.INLINE_DATA_FIELD,
+ HoodieSchema.Blob.EXTERNAL_REFERENCE),
+ rowType.getFieldNames());
+
+ // Convert back to HoodieSchema
+ HoodieSchema convertedSchema =
HoodieSchemaConverter.convertToSchema(rowType);
+ assertEquals(HoodieSchemaType.BLOB, convertedSchema.getType());
+ assertInstanceOf(HoodieSchema.Blob.class, convertedSchema);
+ }
+
+ @Test
+ public void testNullableBlobRoundTrip() {
+ // Create a nullable BLOB schema
+ HoodieSchema nullableBlob =
HoodieSchema.createNullable(HoodieSchema.createBlob());
+ assertTrue(nullableBlob.isNullable());
+ assertEquals(HoodieSchemaType.UNION, nullableBlob.getType());
+
+ // Convert to Flink DataType
+ DataType dataType = HoodieSchemaConverter.convertToDataType(nullableBlob);
+ assertNotNull(dataType);
+ assertTrue(dataType.getLogicalType().isNullable());
+
+ // Verify underlying type is BLOB structure
+ RowType rowType = (RowType) dataType.getLogicalType();
+ assertEquals(3, rowType.getFieldCount());
+
+ // Convert back to HoodieSchema
+ HoodieSchema convertedSchema =
HoodieSchemaConverter.convertToSchema(dataType.getLogicalType());
+ assertTrue(convertedSchema.isNullable());
+
+ // Verify underlying type is BLOB
+ HoodieSchema nonNullSchema = convertedSchema.getNonNullType();
+ assertEquals(HoodieSchemaType.BLOB, nonNullSchema.getType());
+ }
+
+ @Test
+ public void testBlobInNestedStructures() {
+ // Test 1: BLOB field within RECORD structure
+ HoodieSchema recordWithBlob = HoodieSchema.createRecord(
+ "test_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("blob_data", HoodieSchema.createBlob())
+ )
+ );
+
+ DataType recordDataType =
HoodieSchemaConverter.convertToDataType(recordWithBlob);
+ RowType recordRowType = (RowType) recordDataType.getLogicalType();
+ assertEquals(2, recordRowType.getFieldCount());
+
+ // Verify blob field structure
+ RowType blobFieldType = (RowType) recordRowType.getTypeAt(1);
+ assertEquals(3, blobFieldType.getFieldCount());
+
+ // Round-trip conversion
+ HoodieSchema convertedRecord =
HoodieSchemaConverter.convertToSchema(recordRowType, "test_record");
+ assertEquals(HoodieSchemaType.RECORD, convertedRecord.getType());
+ assertEquals(HoodieSchemaType.BLOB,
convertedRecord.getFields().get(1).schema().getType());
+
+ // Test 2: ARRAY of BLOBs
+ HoodieSchema arrayOfBlobs =
HoodieSchema.createArray(HoodieSchema.createBlob());
+ DataType arrayDataType =
HoodieSchemaConverter.convertToDataType(arrayOfBlobs);
+ ArrayType arrayType = (ArrayType) arrayDataType.getLogicalType();
+
+ // Verify element is BLOB structure
+ RowType elementType = (RowType) arrayType.getElementType();
+ assertEquals(3, elementType.getFieldCount());
+
+ // Round-trip
+ HoodieSchema convertedArray =
HoodieSchemaConverter.convertToSchema(arrayType);
+ assertEquals(HoodieSchemaType.ARRAY, convertedArray.getType());
+ assertEquals(HoodieSchemaType.BLOB,
convertedArray.getElementType().getType());
+
+ // Test 3: MAP with BLOB values
+ HoodieSchema mapWithBlobValues =
HoodieSchema.createMap(HoodieSchema.createBlob());
+ DataType mapDataType =
HoodieSchemaConverter.convertToDataType(mapWithBlobValues);
+ MapType mapType = (MapType) mapDataType.getLogicalType();
+
+ // Verify value is BLOB structure
+ RowType valueType = (RowType) mapType.getValueType();
+ assertEquals(3, valueType.getFieldCount());
+
+ // Round-trip
+ HoodieSchema convertedMap = HoodieSchemaConverter.convertToSchema(mapType);
+ assertEquals(HoodieSchemaType.MAP, convertedMap.getType());
+ assertEquals(HoodieSchemaType.BLOB, convertedMap.getValueType().getType());
+ }
+
+ @Test
+ public void testBlobStructureValidation() {
+ // Positive case: Create ROW matching BLOB structure
+ DataType blobLikeRow = DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD,
DataTypes.BYTES().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
DataTypes.BOOLEAN().notNull())
+ ).nullable())
+ ).notNull();
+
+ RowType blobLikeRowType = (RowType) blobLikeRow.getLogicalType();
+ HoodieSchema convertedSchema =
HoodieSchemaConverter.convertToSchema(blobLikeRowType);
+ assertEquals(HoodieSchemaType.BLOB, convertedSchema.getType());
+
+ // Negative case 1: Different field names
+ DataType differentNames = DataTypes.ROW(
+ DataTypes.FIELD("wrong_name", DataTypes.STRING().notNull()),
+ DataTypes.FIELD("data", DataTypes.BYTES().nullable()),
+ DataTypes.FIELD("reference", DataTypes.ROW(
+ DataTypes.FIELD("external_path", DataTypes.STRING().notNull()),
+ DataTypes.FIELD("offset", DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD("length", DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD("managed", DataTypes.BOOLEAN().notNull())
+ ).nullable())
+ ).notNull();
+
+ RowType differentNamesType = (RowType) differentNames.getLogicalType();
+ HoodieSchema notBlob1 =
HoodieSchemaConverter.convertToSchema(differentNamesType);
+ assertEquals(HoodieSchemaType.RECORD, notBlob1.getType()); // Should be
RECORD, not BLOB
+
+ // Negative case 2: Wrong field types
+ DataType wrongTypes = DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.INT().notNull()), //
Wrong type
+ DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD,
DataTypes.BYTES().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
DataTypes.BOOLEAN().notNull())
+ ).nullable())
+ ).notNull();
+
+ RowType wrongTypesType = (RowType) wrongTypes.getLogicalType();
+ HoodieSchema notBlob2 =
HoodieSchemaConverter.convertToSchema(wrongTypesType);
+ assertEquals(HoodieSchemaType.RECORD, notBlob2.getType()); // Should be
RECORD, not BLOB
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 07fe34f09a27..160a731cece4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -57,6 +57,7 @@ import
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaCo
import
org.apache.spark.sql.execution.datasources.parquet.SparkBasicSchemaEvolution;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
@@ -204,7 +205,7 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
MessageType messageType = getFileSchema();
StructType structType = getStructSchema();
schemaOption = Option.of(HoodieSparkSchemaConverters.toHoodieType(
- structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
+ structType, true, messageType.getName(), StringUtils.EMPTY_STRING,
Metadata.empty()));
}
return schemaOption.get();
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
index 11451f3b7ce0..c0f386089406 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -54,7 +54,8 @@ object HoodieSparkSchemaConverters {
def toHoodieType(catalystType: DataType,
nullable: Boolean = false,
recordName: String = "topLevelRecord",
- nameSpace: String = ""): HoodieSchema = {
+ nameSpace: String = "",
+ metadata: Metadata = Metadata.empty): HoodieSchema = {
val schema = catalystType match {
// Primitive types
case BooleanType => HoodieSchema.create(HoodieSchemaType.BOOLEAN)
@@ -79,20 +80,25 @@ object HoodieSparkSchemaConverters {
// Complex types
case ArrayType(elementType, containsNull) =>
- val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace)
+ val elementSchema = toHoodieType(elementType, containsNull,
recordName, nameSpace, metadata)
HoodieSchema.createArray(elementSchema)
case MapType(StringType, valueType, valueContainsNull) =>
- val valueSchema = toHoodieType(valueType, valueContainsNull,
recordName, nameSpace)
+ val valueSchema = toHoodieType(valueType, valueContainsNull,
recordName, nameSpace, metadata)
HoodieSchema.createMap(valueSchema)
+ case blobStruct: StructType if
metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) &&
+
metadata.getString(HoodieSchema.TYPE_METADATA_FIELD).equalsIgnoreCase(HoodieSchemaType.BLOB.name())
=>
+ // Validate blob structure before accepting
+ validateBlobStructure(blobStruct)
+ HoodieSchema.createBlob()
case st: StructType =>
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName"
else recordName
// Check if this might be a union (using heuristic like Avro converter)
if (canBeUnion(st)) {
val nonNullUnionFieldTypes = st.map { f =>
- toHoodieType(f.dataType, nullable = false, f.name, childNameSpace)
+ toHoodieType(f.dataType, nullable = false, f.name, childNameSpace,
f.metadata)
}
val unionFieldTypes = if (nullable) {
(HoodieSchema.create(HoodieSchemaType.NULL) +:
nonNullUnionFieldTypes).asJava
@@ -103,7 +109,7 @@ object HoodieSparkSchemaConverters {
} else {
// Create record
val fields = st.map { f =>
- val fieldSchema = toHoodieType(f.dataType, f.nullable, f.name,
childNameSpace)
+ val fieldSchema = toHoodieType(f.dataType, f.nullable, f.name,
childNameSpace, f.metadata)
val doc = f.getComment.orNull
// Match existing Avro SchemaConverters behavior: use NULL_VALUE
for nullable unions
// to avoid serializing "default":null in JSON representation
@@ -178,7 +184,7 @@ object HoodieSparkSchemaConverters {
SchemaType(StringType, nullable = false)
// Complex types
- case HoodieSchemaType.RECORD =>
+ case HoodieSchemaType.BLOB | HoodieSchemaType.RECORD =>
val fullName = hoodieSchema.getFullName
if (existingRecordNames.contains(fullName)) {
throw new IncompatibleSchemaException(
@@ -190,11 +196,22 @@ object HoodieSparkSchemaConverters {
val newRecordNames = existingRecordNames + fullName
val fields = hoodieSchema.getFields.asScala.map { f =>
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
- val metadata = if (f.doc().isPresent && !f.doc().get().isEmpty) {
+ val commentMetadata = if (f.doc().isPresent &&
!f.doc().get().isEmpty) {
new MetadataBuilder().putString("comment", f.doc().get()).build()
} else {
Metadata.empty
}
+ val fieldSchema = f.getNonNullSchema
+ val metadata = if (fieldSchema.isBlobField) {
+ // Mark blob fields with metadata for identification.
+ // This assumes blobs are always part of a record and not the top
level schema itself
+ new MetadataBuilder()
+ .withMetadata(commentMetadata)
+ .putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
+ .build()
+ } else {
+ commentMetadata
+ }
StructField(f.name(), schemaType.dataType, schemaType.nullable,
metadata)
}
SchemaType(StructType(fields.toSeq), nullable = false)
@@ -245,6 +262,24 @@ object HoodieSparkSchemaConverters {
}
}
+ private lazy val expectedBlobStructType: StructType =
toSqlType(HoodieSchema.createBlob())._1.asInstanceOf[StructType]
+
+ /**
+ * Validates that a StructType matches the expected blob schema structure
defined in {@link HoodieSchema.Blob}.
+ *
+ * @param structType the StructType to validate
+ * @throws IllegalArgumentException if the structure does not match the
expected blob schema
+ */
+ private def validateBlobStructure(structType: StructType): Unit = {
+ if (!structType.equals(expectedBlobStructType)) {
+ throw new IllegalArgumentException(
+ s"""Invalid blob schema structure. Expected schema:
+ |${expectedBlobStructType.toDDL}
+ |Got schema:
+ |${structType.toDDL}""".stripMargin)
+ }
+ }
+
private def canBeUnion(st: StructType): Boolean = {
st.fields.length > 0 &&
st.forall { f =>
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index a0c36b25b338..e807576fba99 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.schema;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -43,6 +44,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
+
/**
* Wrapper class for Avro Schema that provides Hudi-specific schema
functionality
* while maintaining binary compatibility with Avro.
@@ -80,14 +83,18 @@ import java.util.stream.Collectors;
* @since 1.2.0
*/
public class HoodieSchema implements Serializable {
-
+ private static final long serialVersionUID = 1L;
/**
* Constant representing a null JSON value, equivalent to
JsonProperties.NULL_VALUE.
* This provides compatibility with Avro's JsonProperties while maintaining
Hudi's API.
*/
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
- private static final long serialVersionUID = 1L;
+
+ /**
+ * Constant to use when attaching type metadata to external schema systems
like Spark's StructType.
+ */
+ public static final String TYPE_METADATA_FIELD = "hudi_type";
/**
* Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
@@ -115,6 +122,7 @@ public class HoodieSchema implements Serializable {
// Register the Variant logical type with Avro
static {
LogicalTypes.register(VariantLogicalType.VARIANT_LOGICAL_TYPE_NAME, new
VariantLogicalTypeFactory());
+ LogicalTypes.register(BlobLogicalType.BLOB_LOGICAL_TYPE_NAME, new
BlobLogicalTypeFactory());
}
/**
@@ -163,6 +171,8 @@ public class HoodieSchema implements Serializable {
return new HoodieSchema.Timestamp(avroSchema);
} else if (logicalType == VariantLogicalType.variant()) {
return new HoodieSchema.Variant(avroSchema);
+ } else if (logicalType == BlobLogicalType.blob()) {
+ return new HoodieSchema.Blob(avroSchema);
}
}
return new HoodieSchema(avroSchema);
@@ -588,6 +598,7 @@ public class HoodieSchema implements Serializable {
* @return a new HoodieSchema.Variant representing a shredded variant
*/
public static HoodieSchema.Variant createVariantShredded(String name, String
namespace, String doc, HoodieSchema typedValueSchema) {
+ ValidationUtils.checkArgument(typedValueSchema == null ||
!typedValueSchema.containsBlobType(), "Typed value cannot be or contain a BLOB
type");
String variantName = (name != null && !name.isEmpty()) ? name :
VariantLogicalType.VARIANT_LOGICAL_TYPE_NAME;
List<HoodieSchemaField> fields = new ArrayList<>();
@@ -630,6 +641,10 @@ public class HoodieSchema implements Serializable {
return new HoodieSchema.Variant(recordSchema);
}
+ public static HoodieSchema.Blob createBlob() {
+ return new HoodieSchema.Blob(Blob.DEFAULT_NAME);
+ }
+
/**
* Returns the Hudi schema version information.
*
@@ -696,7 +711,7 @@ public class HoodieSchema implements Serializable {
* @return true if this type can have fields (RECORD or VARIANT)
*/
public boolean hasFields() {
- return type == HoodieSchemaType.RECORD || type == HoodieSchemaType.VARIANT;
+ return type == HoodieSchemaType.RECORD || type == HoodieSchemaType.VARIANT
|| type == HoodieSchemaType.BLOB;
}
/**
@@ -941,6 +956,35 @@ public class HoodieSchema implements Serializable {
return HoodieSchema.createUnion(nonNullTypes);
}
+ boolean containsBlobType() {
+ if (getType() == HoodieSchemaType.BLOB) {
+ return true;
+ } else if (getType() == HoodieSchemaType.ARRAY) {
+ return getElementType().containsBlobType();
+ } else if (getType() == HoodieSchemaType.MAP) {
+ return getValueType().containsBlobType();
+ } else if (getType() == HoodieSchemaType.UNION) {
+ return getTypes().stream().anyMatch(HoodieSchema::containsBlobType);
+ } else if (hasFields()) {
+ return getFields().stream().anyMatch(field ->
field.schema().containsBlobType());
+ }
+ return false;
+ }
+
+ /**
+ * A convenience method to check if the current field represents a blob type.
+ * This checks if the current schema is a BLOB or if it is an ARRAY or MAP
whose element or value type is a BLOB, respectively.
+ * It does not check for BLOB types nested within unions or record fields.
+ * @return true if the current schema is a BLOB or an ARRAY/MAP of BLOBs,
false otherwise
+ */
+ public boolean isBlobField() {
+ HoodieSchema nonNullSchema = getNonNullType();
+ HoodieSchemaType nonNullSchemaType = nonNullSchema.getType();
+ return nonNullSchemaType == HoodieSchemaType.BLOB
+ || (nonNullSchemaType == HoodieSchemaType.ARRAY &&
nonNullSchema.getElementType().getNonNullType().getType() ==
HoodieSchemaType.BLOB)
+ || (nonNullSchemaType == HoodieSchemaType.MAP &&
nonNullSchema.getValueType().getNonNullType().getType() ==
HoodieSchemaType.BLOB);
+ }
+
/**
* Gets a nested field using dot notation, supporting Parquet-style
array/map accessors.
*
@@ -1814,6 +1858,11 @@ public class HoodieSchema implements Serializable {
// If not a union, it should at least be bytes (some shredded
variants may have non-null value)
throw new IllegalArgumentException("Shredded Variant value field
must be BYTES or nullable BYTES, got: " + valueSchema.getType());
}
+
Option.ofNullable(avroSchema.getField(Variant.VARIANT_TYPED_VALUE_FIELD)).ifPresent(field
-> {
+ if (HoodieSchema.fromAvroSchema(field.schema()).containsBlobType()) {
+ throw new IllegalArgumentException("Variant typed_value field
cannot be or contain a BLOB type");
+ }
+ });
} else {
// Unshredded: value must be non-nullable bytes
if (valueSchema.getType() != Schema.Type.BYTES) {
@@ -1883,6 +1932,127 @@ public class HoodieSchema implements Serializable {
}
}
+ static class BlobLogicalType extends LogicalType {
+
+ private static final String BLOB_LOGICAL_TYPE_NAME = "blob";
+ // Eager initialization of singleton
+ private static final BlobLogicalType INSTANCE = new BlobLogicalType();
+
+ private BlobLogicalType() {
+ super(BlobLogicalType.BLOB_LOGICAL_TYPE_NAME);
+ }
+
+ public static BlobLogicalType blob() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ if (schema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException("Blob logical type can only be
applied to RECORD schemas, got: " + schema.getType());
+ }
+ if (!schema.getFields().equals(HoodieSchema.Blob.BLOB_FIELDS)) {
+ throw new IllegalArgumentException("Blob logical type cannot be
applied to schema: " + schema);
+ }
+ }
+ }
+
+ /**
+ * Factory for creating BlobLogicalType instances.
+ */
+ private static class BlobLogicalTypeFactory implements
LogicalTypes.LogicalTypeFactory {
+ @Override
+ public LogicalType fromSchema(Schema schema) {
+ return BlobLogicalType.blob();
+ }
+
+ @Override
+ public String getTypeName() {
+ return BlobLogicalType.BLOB_LOGICAL_TYPE_NAME;
+ }
+ }
+
+ /**
+ * Blob types represent raw binary data. The data can be stored in-line as a
byte array or out-of-line as a reference to a file or offset and length within
that file.
+ */
+ public static class Blob extends HoodieSchema {
+ private static final String DEFAULT_NAME = "blob";
+ private static final List<Schema.Field> BLOB_FIELDS = createBlobFields();
+
+ public static final String TYPE = "type";
+ public static final String INLINE_DATA_FIELD = "data";
+ public static final String EXTERNAL_REFERENCE = "reference";
+ public static final String EXTERNAL_REFERENCE_PATH = "external_path";
+ // if offset is not specified, it is assumed to be 0 (start of file)
+ public static final String EXTERNAL_REFERENCE_OFFSET = "offset";
+ // if length is not specified, it is assumed to be the rest of the file
starting from offset
+ public static final String EXTERNAL_REFERENCE_LENGTH = "length";
+ public static final String EXTERNAL_REFERENCE_IS_MANAGED = "managed";
+
+ public static int getFieldCount() {
+ return BLOB_FIELDS.size();
+ }
+
+ public static int getReferenceFieldCount() {
+ return
AvroSchemaUtils.getNonNullTypeFromUnion(BLOB_FIELDS.get(2).schema()).getFields().size();
+ }
+
+ /**
+ * Creates a new HoodieSchema wrapping the given Avro schema.
+ *
+ * @param name Name for the blob schema
+ * @throws IllegalArgumentException if avroSchema is null or does not have
a valid blob logical type
+ */
+ private Blob(String name) {
+ super(createSchema(name));
+ }
+
+ private Blob(Schema avroSchema) {
+ super(avroSchema);
+ }
+
+ @Override
+ public String getName() {
+ return "blob";
+ }
+
+ @Override
+ public HoodieSchemaType getType() {
+ return HoodieSchemaType.BLOB;
+ }
+
+ private static Schema createSchema(String name) {
+ Schema blobSchema = Schema.createRecord(name, null, null, false);
+ // each instance requires its own copy of the fields list
+ List<Schema.Field> fields = new ArrayList<>(BLOB_FIELDS.size());
+ for (Schema.Field field : BLOB_FIELDS) {
+ fields.add(createNewSchemaField(field));
+ }
+ blobSchema.setFields(fields);
+ BlobLogicalType.blob().addToSchema(blobSchema);
+ return blobSchema;
+ }
+
+ private static List<Schema.Field> createBlobFields() {
+ Schema bytesField = Schema.create(Schema.Type.BYTES);
+ Schema referenceField = Schema.createRecord(EXTERNAL_REFERENCE, null,
null, false);
+ List<Schema.Field> referenceFields = Arrays.asList(
+ new Schema.Field(EXTERNAL_REFERENCE_PATH,
Schema.create(Schema.Type.STRING), null, null),
+ new Schema.Field(EXTERNAL_REFERENCE_OFFSET,
AvroSchemaUtils.createNullableSchema(Schema.create(Schema.Type.LONG)), null,
null),
+ new Schema.Field(EXTERNAL_REFERENCE_LENGTH,
AvroSchemaUtils.createNullableSchema(Schema.create(Schema.Type.LONG)), null,
null),
+ new Schema.Field(EXTERNAL_REFERENCE_IS_MANAGED,
Schema.create(Schema.Type.BOOLEAN), null, null)
+ );
+ referenceField.setFields(referenceFields);
+
+ return Arrays.asList(
+ new Schema.Field(TYPE, Schema.createEnum("blob_storage_type", null,
null, Arrays.asList("INLINE", "OUT_OF_LINE")), null, null),
+ new Schema.Field(INLINE_DATA_FIELD,
AvroSchemaUtils.createNullableSchema(bytesField), null,
Schema.Field.NULL_DEFAULT_VALUE),
+ new Schema.Field(EXTERNAL_REFERENCE,
AvroSchemaUtils.createNullableSchema(referenceField), null,
Schema.Field.NULL_DEFAULT_VALUE)
+ );
+ }
+ }
+
private void writeObject(ObjectOutputStream oos) throws IOException {
oos.defaultWriteObject();
oos.writeObject(avroSchema.toString());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
index c8c76c7272f7..e3f28165f0db 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
@@ -154,6 +154,7 @@ public class HoodieSchemaComparatorForSchemaEvolution {
switch (s1.getType()) {
case RECORD:
+ case BLOB:
return recordSchemaEquals(s1, s2);
case ENUM:
return enumSchemaEquals(s1, s2);
@@ -190,6 +191,10 @@ public class HoodieSchemaComparatorForSchemaEvolution {
}
protected boolean validateRecord(HoodieSchema s1, HoodieSchema s2) {
+ // BLOB schemas are never error records, so skip the error check for BLOB
+ if (s1.getType() == HoodieSchemaType.BLOB) {
+ return true;
+ }
return s1.isError() == s2.isError();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
index bede6afb621b..66935b6d1745 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java
@@ -115,7 +115,7 @@ public class HoodieSchemaCompatibilityChecker {
* @return the writer field, if any does correspond, or None.
*/
public static HoodieSchemaField lookupWriterField(final HoodieSchema
writerSchema, final HoodieSchemaField readerField) {
- assert (writerSchema.getType() == HoodieSchemaType.RECORD);
+ assert (writerSchema.hasFields());
final List<HoodieSchemaField> writerFields = new ArrayList<>();
writerSchema.getField(readerField.name()).ifPresent(writerFields::add);
for (final String readerFieldAliasName : readerField.aliases()) {
@@ -316,6 +316,7 @@ public class HoodieSchemaCompatibilityChecker {
result = result.mergedWith(checkSchemaNames(reader, writer,
locations));
return
result.mergedWith(checkReaderEnumContainsAllWriterEnumSymbols(reader, writer,
locations));
case RECORD:
+ case BLOB:
result = result.mergedWith(checkSchemaNames(reader, writer,
locations));
return result.mergedWith(checkReaderWriterRecordFields(reader,
writer, locations));
case UNION:
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaProjectionChecker.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaProjectionChecker.java
index fc323eb0820a..0784c185990c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaProjectionChecker.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaProjectionChecker.java
@@ -97,6 +97,7 @@ class HoodieSchemaProjectionChecker {
if (sourceSchema.getType() == targetSchema.getType()) {
switch (sourceSchema.getType()) {
case RECORD:
+ case BLOB:
// For records, every target field must exist in source with
compatible schema
for (HoodieSchemaField targetField : targetSchema.getFields()) {
Option<HoodieSchemaField> sourceFieldOpt =
sourceSchema.getField(targetField.name());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java
index 81b7f94ace0d..199d144a07d6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java
@@ -119,6 +119,11 @@ public enum HoodieSchemaType {
VARIANT(Schema.Type.RECORD),
+ /**
+ * BLOB type - represents binary large objects, stored as records with
either inline bytes or a reference to a file and range within that file.
+ */
+ BLOB(Schema.Type.RECORD),
+
/**
* Null type - represents the absence of a value
*/
@@ -156,6 +161,8 @@ public enum HoodieSchemaType {
return UUID;
} else if (logicalType instanceof VariantLogicalType) {
return VARIANT;
+ } else if (logicalType == HoodieSchema.BlobLogicalType.blob()) {
+ return BLOB;
}
}
switch (avroSchema.getType()) {
@@ -223,6 +230,7 @@ public enum HoodieSchemaType {
case MAP:
case UNION:
case VARIANT:
+ case BLOB:
return true;
default:
return false;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 2644f9e07884..2beb626fe9aa 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -2047,14 +2047,16 @@ public class HoodieTableMetadataUtil {
// HUDI-8585 will add support for BYTES and FIXED
return type != HoodieSchemaType.RECORD && type != HoodieSchemaType.ARRAY
&& type != HoodieSchemaType.MAP
&& type != HoodieSchemaType.ENUM && type != HoodieSchemaType.BYTES &&
type != HoodieSchemaType.FIXED
- && type != HoodieSchemaType.DECIMAL; // DECIMAL's underlying type is
BYTES
+ && type != HoodieSchemaType.DECIMAL // DECIMAL's underlying type is
BYTES
+ && type != HoodieSchemaType.BLOB;
}
private static boolean isColumnTypeSupportedV2(HoodieSchema schema) {
HoodieSchemaType type = schema.getType();
// Check for precision and scale if the schema has a logical decimal type.
return type != HoodieSchemaType.RECORD && type != HoodieSchemaType.MAP
- && type != HoodieSchemaType.ARRAY && type != HoodieSchemaType.ENUM;
+ && type != HoodieSchemaType.ARRAY && type != HoodieSchemaType.ENUM
+ && type != HoodieSchemaType.BLOB;
}
public static Set<String> getInflightMetadataPartitions(HoodieTableConfig
tableConfig) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index d473e0efc63a..286e739e52a9 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -1689,4 +1689,229 @@ public class TestHoodieSchema {
// Verify the logical type name
assertEquals("variant", instance1.getName());
}
+
+ @Test
+ public void testCreateBlob() {
+ HoodieSchema.Blob blob = HoodieSchema.createBlob();
+
+ assertNotNull(blob);
+ assertEquals("blob", blob.getName());
+ assertEquals(HoodieSchemaType.BLOB, blob.getType());
+
+ // Verify storage_type field
+ Option<HoodieSchemaField> storageTypeOpt = blob.getField("type");
+ assertTrue(storageTypeOpt.isPresent());
+ HoodieSchemaField storageTypeField = storageTypeOpt.get();
+ assertEquals(HoodieSchemaType.ENUM, storageTypeField.schema().getType());
+ assertEquals(Arrays.asList("INLINE", "OUT_OF_LINE"),
storageTypeField.schema().getEnumSymbols());
+ assertFalse(storageTypeField.schema().isNullable());
+
+ // Verify data field is nullable
+ Option<HoodieSchemaField> dataOpt = blob.getField("data");
+ assertTrue(dataOpt.isPresent());
+ HoodieSchemaField dataField = dataOpt.get();
+ assertTrue(dataField.schema().isNullable());
+ assertEquals(HoodieSchemaType.BYTES,
dataField.schema().getNonNullType().getType());
+
+ // Verify reference field is nullable
+ Option<HoodieSchemaField> refOpt = blob.getField("reference");
+ assertTrue(refOpt.isPresent());
+ HoodieSchemaField refField = refOpt.get();
+ assertTrue(refField.schema().isNullable());
+ assertEquals(HoodieSchemaType.RECORD,
refField.schema().getNonNullType().getType());
+
+ HoodieSchema refSchema = refOpt.get().schema().getNonNullType();
+ assertEquals(HoodieSchemaType.RECORD, refSchema.getType());
+
+ // Verify reference has all required fields
+ Option<HoodieSchemaField> externalPathOpt =
refSchema.getField("external_path");
+ assertTrue(externalPathOpt.isPresent());
+ assertEquals(HoodieSchemaType.STRING,
externalPathOpt.get().schema().getType());
+ assertFalse(externalPathOpt.get().schema().isNullable());
+
+ Option<HoodieSchemaField> offsetOpt = refSchema.getField("offset");
+ assertTrue(offsetOpt.isPresent());
+ assertEquals(HoodieSchemaType.LONG,
offsetOpt.get().schema().getNonNullType().getType());
+ assertTrue(offsetOpt.get().schema().isNullable());
+
+ Option<HoodieSchemaField> lengthOpt = refSchema.getField("length");
+ assertTrue(lengthOpt.isPresent());
+ assertEquals(HoodieSchemaType.LONG,
lengthOpt.get().schema().getNonNullType().getType());
+ assertTrue(lengthOpt.get().schema().isNullable());
+
+ Option<HoodieSchemaField> managedOpt = refSchema.getField("managed");
+ assertTrue(managedOpt.isPresent());
+ assertEquals(HoodieSchemaType.BOOLEAN,
managedOpt.get().schema().getType());
+ assertFalse(managedOpt.get().schema().isNullable());
+ }
+
+ @Test
+ public void testBlobLogicalTypeDetection() {
+ HoodieSchema.Blob blob = HoodieSchema.createBlob();
+
+ // Verify logical type is set
+ Schema avroSchema = blob.toAvroSchema();
+ assertNotNull(avroSchema.getLogicalType());
+ assertEquals("blob", avroSchema.getLogicalType().getName());
+
+ // Verify it can be detected back
+ HoodieSchema reconstructed = HoodieSchema.fromAvroSchema(avroSchema);
+ assertInstanceOf(HoodieSchema.Blob.class, reconstructed);
+ assertEquals(HoodieSchemaType.BLOB, reconstructed.getType());
+ }
+
+ @Test
+ public void testBlobsAsRecordFields() {
+ HoodieSchema recordSchema = HoodieSchema.createRecord("test_record", null,
null, Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("file_data", HoodieSchema.createBlob()),
+ HoodieSchemaField.of("image_data", HoodieSchema.createBlob())
+ ));
+
+ Option<HoodieSchemaField> blobFieldOpt =
recordSchema.getField("file_data");
+ assertTrue(blobFieldOpt.isPresent());
+ HoodieSchemaField blobField = blobFieldOpt.get();
+ assertEquals(HoodieSchemaType.BLOB, blobField.schema().getType());
+
+ // Verify the blob field has proper structure
+ HoodieSchema blobSchema = blobField.schema();
+ assertInstanceOf(HoodieSchema.Blob.class, blobSchema);
+
+ // Validate the blob schema can be converted to string and back without
losing the logical type
+ String recordJson = recordSchema.toString();
+ HoodieSchema parsedRecord = HoodieSchema.parse(recordJson);
+
+ Option<HoodieSchemaField> parsedFileDataFieldOpt =
parsedRecord.getField("file_data");
+ assertTrue(parsedFileDataFieldOpt.isPresent());
+ HoodieSchemaField parsedFileDataField = parsedFileDataFieldOpt.get();
+ assertInstanceOf(HoodieSchema.Blob.class, parsedFileDataField.schema());
+ assertEquals(HoodieSchemaType.BLOB,
parsedFileDataField.schema().getType());
+ assertSame(HoodieSchema.BlobLogicalType.blob(),
parsedFileDataField.schema().toAvroSchema().getLogicalType());
+
+ Option<HoodieSchemaField> parsedImageDataFieldOpt =
parsedRecord.getField("image_data");
+ assertTrue(parsedImageDataFieldOpt.isPresent());
+ HoodieSchemaField parsedImageDataField = parsedImageDataFieldOpt.get();
+ assertInstanceOf(HoodieSchema.Blob.class, parsedImageDataField.schema());
+ assertEquals(HoodieSchemaType.BLOB,
parsedImageDataField.schema().getType());
+ assertSame(HoodieSchema.BlobLogicalType.blob(),
parsedImageDataField.schema().toAvroSchema().getLogicalType());
+ }
+
+ private static final String BLOB_JSON = HoodieSchema.createBlob().toString();
+
+ @Test
+ public void testParseBlobFromJsonWithLogicalType() {
+ // JSON representation of a Blob schema
+ HoodieSchema parsedSchema = HoodieSchema.parse(BLOB_JSON);
+
+ assertInstanceOf(HoodieSchema.Blob.class, parsedSchema);
+ HoodieSchema.Blob parsedBlob = (HoodieSchema.Blob) parsedSchema;
+ assertEquals(HoodieSchemaType.BLOB, parsedBlob.getType());
+ }
+
+ private static HoodieSchema createRecordWithBlob() {
+ return HoodieSchema.createRecord("record", null, null,
+ Arrays.asList(HoodieSchemaField.of("field1",
HoodieSchema.create(HoodieSchemaType.INT)), HoodieSchemaField.of("field2",
HoodieSchema.createBlob())));
+ }
+
+ @Test
+ public void testContainsBlobTypeDirectBlob() {
+ HoodieSchema blob = HoodieSchema.createBlob();
+ assertTrue(blob.containsBlobType());
+ }
+
+ @Test
+ public void testContainsBlobTypeInUnion() {
+ HoodieSchema unionWithBlob = HoodieSchema.createUnion(
+ HoodieSchema.create(HoodieSchemaType.STRING),
+ HoodieSchema.createBlob()
+ );
+ assertTrue(unionWithBlob.containsBlobType());
+
+ HoodieSchema unionNonBlob = HoodieSchema.createUnion(
+ HoodieSchema.create(HoodieSchemaType.STRING),
+ HoodieSchema.create(HoodieSchemaType.INT)
+ );
+ assertFalse(unionNonBlob.containsBlobType());
+ }
+
+ @Test
+ public void testContainsBlobTypeNullable() {
+ HoodieSchema nullableBlob =
HoodieSchema.createNullable(HoodieSchema.createBlob());
+ assertTrue(nullableBlob.containsBlobType());
+
+ HoodieSchema nestedNullableBlob = HoodieSchema.createRecord("record",
null, null, Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("data",
HoodieSchema.createNullable(HoodieSchema.createBlob()))
+ ));
+ assertTrue(nestedNullableBlob.containsBlobType());
+ }
+
+ @Test
+ public void testContainsBlobTypeRecordWithoutBlob() {
+ HoodieSchema schema = HoodieSchema.createRecord("test", null, null,
Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("name",
HoodieSchema.create(HoodieSchemaType.STRING))
+ ));
+ assertFalse(schema.containsBlobType());
+ }
+
+ @Test
+ public void testContainsBlobTypeInMaps() {
+ HoodieSchema mapWithBlob =
HoodieSchema.createMap(HoodieSchema.createBlob());
+ assertTrue(mapWithBlob.containsBlobType());
+
+ HoodieSchema mapWithNestedBlob =
HoodieSchema.createMap(HoodieSchema.createNullable(createRecordWithBlob()));
+ assertTrue(mapWithNestedBlob.containsBlobType());
+
+ HoodieSchema mapNonBlob =
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.STRING));
+ assertFalse(mapNonBlob.containsBlobType());
+ }
+
+ @Test
+ public void testContainsBlobTypeInArrays() {
+ HoodieSchema arrayWithBlob =
HoodieSchema.createArray(HoodieSchema.createBlob());
+ assertTrue(arrayWithBlob.containsBlobType());
+
+ HoodieSchema arrayWithNestedBlob =
HoodieSchema.createArray(HoodieSchema.createNullable(createRecordWithBlob()));
+ assertTrue(arrayWithNestedBlob.containsBlobType());
+
+ HoodieSchema arrayNonBlob =
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING));
+ assertFalse(arrayNonBlob.containsBlobType());
+ }
+
+ @Test
+ public void testCreateVariantWithBlobTypedValueShouldFail() {
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.createVariantShredded(HoodieSchema.createBlob()));
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.createVariantShredded(createRecordWithBlob()));
+ }
+
+ @Test
+ public void testParseRecordWithVariantWithBlobTypedValueShouldFail() {
+ String schemaJson = "{\"type\":\"record\",\"name\":\"test\",\"fields\":["
+ +
"{\"name\":\"data\",\"type\":{\"type\":\"record\",\"name\":\"variant\",\"logicalType\":\"variant\",\"fields\":["
+ + "{\"name\":\"metadata\",\"type\":\"bytes\"},"
+ + "{\"name\":\"value\",\"type\":\"bytes\"},"
+ + "{\"name\":\"typed_value\",\"type\":" + BLOB_JSON + "}"
+ + "]}}"
+ + "]}";
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchema.parse(schemaJson).getField("data"));
+ }
+
+ @Test
+ public void testIsBlobField() {
+ // blobs, arrays with blob elements, and map with blob values should be
detected as blob fields
+ assertTrue(HoodieSchema.createBlob().isBlobField());
+
assertTrue(HoodieSchema.createNullable(HoodieSchema.createBlob()).isBlobField());
+
assertTrue(HoodieSchema.createMap(HoodieSchema.createBlob()).isBlobField());
+
assertTrue(HoodieSchema.createNullable(HoodieSchema.createMap(HoodieSchema.createNullable(HoodieSchema.createBlob()))).isBlobField());
+
assertTrue(HoodieSchema.createArray(HoodieSchema.createBlob()).isBlobField());
+
assertTrue(HoodieSchema.createNullable(HoodieSchema.createArray(HoodieSchema.createNullable(HoodieSchema.createBlob()))).isBlobField());
+ // non-blob types should not be detected as blob fields
+ assertFalse(HoodieSchema.create(HoodieSchemaType.STRING).isBlobField());
+ // A record that does not contain a blob field should not be detected as a
blob field itself
+ assertFalse(createRecordWithBlob().isBlobField());
+ // Similarly, arrays and maps with records as the element/value type
should not be detected as blob fields
+
assertFalse(HoodieSchema.createArray(createRecordWithBlob()).isBlobField());
+ assertFalse(HoodieSchema.createMap(createRecordWithBlob()).isBlobField());
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
index e28d694590e0..29d53f0c146c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
@@ -502,4 +502,113 @@ class TestHoodieSchemaComparatorForSchemaEvolution {
HoodieSchema.parse(timeMicros)
));
}
+
+ @Test
+ void testBlobSchemaEquality() {
+ // Tests that BLOB schemas are compared correctly
+ // BLOB schemas should be equal to themselves and other identical BLOB
schemas
+ HoodieSchema blob1 = HoodieSchema.createBlob();
+ HoodieSchema blob2 = HoodieSchema.createBlob();
+
+ // Two identical BLOB schemas should be equal
+ assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(blob1,
blob2));
+
+ // BLOB schema should equal itself
+ assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(blob1,
blob1));
+
+ // BLOB and RECORD with same structure should NOT be equal (different
types)
+ HoodieSchema recordWithSameStructure = HoodieSchema.createRecord("record",
null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("type",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("data",
HoodieSchema.createNullable(HoodieSchemaType.BYTES), null, null),
+ HoodieSchemaField.of("reference", HoodieSchema.createNullable(
+ HoodieSchema.createRecord("ref", null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("external_path",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("offset",
HoodieSchema.createNullable(HoodieSchemaType.LONG), null, null),
+ HoodieSchemaField.of("length",
HoodieSchema.createNullable(HoodieSchemaType.LONG), null, null),
+ HoodieSchemaField.of("managed",
HoodieSchema.create(HoodieSchemaType.BOOLEAN), null, null)
+ ))), null, null)
+ ));
+
+ assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(blob1,
recordWithSameStructure));
+ }
+
+ @Test
+ void testBlobInNestedStructures() {
+ // Tests that BLOB handling works correctly in complex nested types
+ HoodieSchema blob = HoodieSchema.createBlob();
+
+ // BLOB inside ARRAY
+ HoodieSchema arrayOfBlobs1 = HoodieSchema.createArray(blob);
+ HoodieSchema arrayOfBlobs2 =
HoodieSchema.createArray(HoodieSchema.createBlob());
+
assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(arrayOfBlobs1,
arrayOfBlobs2));
+
+ // BLOB inside MAP
+ HoodieSchema mapWithBlobs1 = HoodieSchema.createMap(blob);
+ HoodieSchema mapWithBlobs2 =
HoodieSchema.createMap(HoodieSchema.createBlob());
+
assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(mapWithBlobs1,
mapWithBlobs2));
+
+ // BLOB inside RECORD field
+ HoodieSchema recordWithBlob1 = HoodieSchema.createRecord("test", null,
null, false,
+ Collections.singletonList(
+ HoodieSchemaField.of("blob_field", blob, null, null)
+ ));
+ HoodieSchema recordWithBlob2 = HoodieSchema.createRecord("test", null,
null, false,
+ Collections.singletonList(
+ HoodieSchemaField.of("blob_field", HoodieSchema.createBlob(),
null, null)
+ ));
+
assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(recordWithBlob1,
recordWithBlob2));
+
+ // ARRAY of different types should not be equal
+ HoodieSchema arrayOfStrings =
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING));
+
assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(arrayOfBlobs1,
arrayOfStrings));
+ }
+
+ @Test
+ void testBlobFieldEquality() {
+ // Tests that BLOB field comparison follows RECORD rules
+ // BLOBs have a fixed structure with 3 fields: type, data, reference
+ HoodieSchema blob1 = HoodieSchema.createBlob();
+ HoodieSchema blob2 = HoodieSchema.createBlob();
+
+ // Same BLOB schemas should be equal
+ assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(blob1,
blob2));
+
+ // Create a BLOB-like record with different field types
+ HoodieSchema blobWithDifferentTypes = HoodieSchema.createRecord("blob",
null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("type",
HoodieSchema.create(HoodieSchemaType.INT), null, null), // Changed from STRING
to INT
+ HoodieSchemaField.of("data",
HoodieSchema.createNullable(HoodieSchemaType.BYTES), null, null),
+ HoodieSchemaField.of("reference", HoodieSchema.createNullable(
+ HoodieSchema.createRecord("ref", null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("external_path",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("offset",
HoodieSchema.createNullable(HoodieSchemaType.LONG), null, null),
+ HoodieSchemaField.of("length",
HoodieSchema.createNullable(HoodieSchemaType.LONG), null, null),
+ HoodieSchemaField.of("managed",
HoodieSchema.create(HoodieSchemaType.BOOLEAN), null, null)
+ ))), null, null)
+ ));
+
+ // BLOB with different field types should not be equal (even though
structure is similar)
+ assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(blob1,
blobWithDifferentTypes));
+
+ // Create a BLOB-like record with fields in different order
+ HoodieSchema blobWithDifferentOrder = HoodieSchema.createRecord("blob",
null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("data",
HoodieSchema.createNullable(HoodieSchemaType.BYTES), null, null),
+ HoodieSchemaField.of("type",
HoodieSchema.create(HoodieSchemaType.STRING), null, null), // Different order
+ HoodieSchemaField.of("reference", HoodieSchema.createNullable(
+ HoodieSchema.createRecord("ref", null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("external_path",
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+ HoodieSchemaField.of("offset",
HoodieSchema.createNullable(HoodieSchemaType.LONG), null, null),
+ HoodieSchemaField.of("length",
HoodieSchema.createNullable(HoodieSchemaType.LONG), null, null),
+ HoodieSchemaField.of("managed",
HoodieSchema.create(HoodieSchemaType.BOOLEAN), null, null)
+ ))), null, null)
+ ));
+
+ // BLOB with fields in different order should not be equal (order matters)
+ assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(blob1,
blobWithDifferentOrder));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
index 36c71fb59f4e..6ff89d6e8586 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaCompatibility.java
@@ -725,4 +725,93 @@ public class TestHoodieSchemaCompatibility {
assertTrue(HoodieSchemaCompatibility.isSchemaCompatible(schema, schema,
false, true));
assertTrue(HoodieSchemaCompatibility.isSchemaCompatible(schema, schema,
false, false));
}
+
+ @Test
+ public void testCompatibilityWithBlobFields() {
+ // Tests that BLOB compatibility follows RECORD rules
+ HoodieSchema writerBlob = HoodieSchema.createBlob();
+ HoodieSchema readerBlob = HoodieSchema.createBlob();
+
+ // Reader BLOB with all writer BLOB fields should be compatible
+ assertTrue(HoodieSchemaCompatibility.isSchemaCompatible(writerBlob,
readerBlob, true, false));
+
+ // Same BLOB schemas are compatible both ways
+ assertTrue(HoodieSchemaCompatibility.isSchemaCompatible(readerBlob,
writerBlob, true, false));
+
+ // Test BLOB in a record context
+ HoodieSchema writerSchema = HoodieSchema.createRecord("test", null, null,
false,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT), null, null),
+ HoodieSchemaField.of("blob_field", writerBlob, null, null)
+ ));
+
+ HoodieSchema readerSchema = HoodieSchema.createRecord("test", null, null,
false,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT), null, null),
+ HoodieSchemaField.of("blob_field", readerBlob, null, null)
+ ));
+
+ // Records containing BLOBs should be compatible
+ assertTrue(HoodieSchemaCompatibility.isSchemaCompatible(writerSchema,
readerSchema, true, false));
+
+ // Test writer with extra fields (reader can ignore with projection
allowed)
+ HoodieSchema writerWithExtraField = HoodieSchema.createRecord("test",
null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.INT), null, null),
+ HoodieSchemaField.of("blob_field", writerBlob, null, null),
+ HoodieSchemaField.of("extra_field",
HoodieSchema.createNullable(HoodieSchemaType.STRING), null, null)
+ ));
+
+ // Reader can ignore extra fields if projection is allowed
+
assertTrue(HoodieSchemaCompatibility.isSchemaCompatible(writerWithExtraField,
readerSchema, true, true));
+
+ // Source with more fields projects to target with fewer fields
+
assertTrue(HoodieSchemaCompatibility.isStrictProjectionOf(writerWithExtraField,
readerSchema));
+
assertTrue(HoodieSchemaCompatibility.isCompatibleProjectionOf(writerWithExtraField,
readerSchema));
+ // Opposite should be false
+ assertFalse(HoodieSchemaCompatibility.isStrictProjectionOf(readerSchema,
writerWithExtraField));
+ }
+
+ @Test
+ public void testBlobAreSchemasProjectionEquivalent() {
+ // Tests that BLOB projection equivalence works correctly
+ HoodieSchema blob1 = HoodieSchema.createBlob();
+ HoodieSchema blob2 = HoodieSchema.createBlob();
+
+ // Two BLOBs with same field structure are equivalent
+ assertTrue(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(blob1,
blob2));
+
+ // BLOB is equivalent to itself
+ assertTrue(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(blob1,
blob1));
+
+ // BLOBs in arrays are projection equivalent
+ HoodieSchema arrayBlob1 = HoodieSchema.createArray(blob1);
+ HoodieSchema arrayBlob2 = HoodieSchema.createArray(blob2);
+ arrayBlob1.addProp("prop1", "value1"); // prevent Objects.equals from
returning true
+
assertTrue(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(arrayBlob1,
arrayBlob2));
+
+ // BLOBs in maps are projection equivalent
+ HoodieSchema mapBlob1 = HoodieSchema.createMap(blob1);
+ HoodieSchema mapBlob2 = HoodieSchema.createMap(blob2);
+ mapBlob1.addProp("prop1", "value1"); // prevent Objects.equals from
returning true
+
assertTrue(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(mapBlob1,
mapBlob2));
+
+ // BLOB with nested BLOBs in a record structure
+ HoodieSchema recordWithBlob1 = HoodieSchema.createRecord("test", null,
null, false,
+ Collections.singletonList(
+ HoodieSchemaField.of("nested_blob", blob1, null, null)
+ ));
+
+ HoodieSchema recordWithBlob2 = HoodieSchema.createRecord("test2", null,
null, false,
+ Collections.singletonList(
+ HoodieSchemaField.of("nested_blob", blob2, null, null)
+ ));
+
+ // Records with BLOBs should be projection equivalent (names can differ)
+
assertTrue(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(recordWithBlob1,
recordWithBlob2));
+
+ // BLOB vs non-BLOB should not be equivalent
+ HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
+
assertFalse(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(blob1,
stringSchema));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaType.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaType.java
index 56d85a5a500b..a14435b2e7d8 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaType.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaType.java
@@ -84,6 +84,8 @@ public class TestHoodieSchemaType {
assertTrue(HoodieSchemaType.ARRAY.isComplex(), "ARRAY should be complex");
assertTrue(HoodieSchemaType.MAP.isComplex(), "MAP should be complex");
assertTrue(HoodieSchemaType.UNION.isComplex(), "UNION should be complex");
+ assertTrue(HoodieSchemaType.VARIANT.isComplex(), "VARIANT should be
complex");
+ assertTrue(HoodieSchemaType.BLOB.isComplex(), "BLOB should be complex");
assertFalse(HoodieSchemaType.STRING.isComplex(), "STRING should not be
complex");
assertFalse(HoodieSchemaType.INT.isComplex(), "INT should not be complex");
@@ -114,6 +116,8 @@ public class TestHoodieSchemaType {
assertFalse(HoodieSchemaType.ARRAY.isNumeric(), "ARRAY should not be
numeric");
assertFalse(HoodieSchemaType.MAP.isNumeric(), "MAP should not be numeric");
assertFalse(HoodieSchemaType.UNION.isNumeric(), "UNION should not be
numeric");
+ assertFalse(HoodieSchemaType.VARIANT.isNumeric(), "VARIANT should not be
numeric");
+ assertFalse(HoodieSchemaType.BLOB.isNumeric(), "BLOB should not be
numeric");
}
@Test
@@ -202,6 +206,7 @@ public class TestHoodieSchemaType {
map.put(HoodieSchemaType.UUID,
LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)));
map.put(HoodieSchemaType.VARIANT, createVariantSchemaForTest());
+ map.put(HoodieSchemaType.BLOB, HoodieSchema.createBlob().toAvroSchema());
return map;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
index 7628bb09c03d..038dcafab18b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala
@@ -18,12 +18,15 @@
package org.apache.spark.sql.avro
import org.apache.hudi.avro.model.HoodieMetadataColumnStats
-import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField,
HoodieSchemaType}
import org.apache.avro.JsonProperties
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField,
StructType}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
import org.junit.jupiter.api.Test
+import java.util
+
class TestSchemaConverters {
@Test
@@ -46,4 +49,159 @@ class TestSchemaConverters {
}
}
}
+
+ @Test
+ def testSchemaWithBlobsRoundTrip(): Unit = {
+ val originalSchema = HoodieSchema.createRecord("document", "test", null,
util.Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("metadata", HoodieSchema.createRecord("meta", null,
null, util.Arrays.asList(
+ HoodieSchemaField.of("image", HoodieSchema.createBlob()),
+ HoodieSchemaField.of("thumbnail",
HoodieSchema.createNullable(HoodieSchema.createBlob()))
+ )))))
+
+ // Hudi -> Spark
+ val sparkType = HoodieSparkSchemaConverters.toSqlType(originalSchema)._1
+ // validate the metadata is set on the blob fields and nullability is
preserved
+ val metadataSparkField =
sparkType.asInstanceOf[StructType].fields.find(_.name ==
"metadata").get.dataType.asInstanceOf[StructType]
+ val thumbNailSparkField = metadataSparkField.fields.find(_.name ==
"thumbnail").get
+ assertEquals(HoodieSchemaType.BLOB.name(),
thumbNailSparkField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertTrue(thumbNailSparkField.nullable)
+ validateBlobFields(thumbNailSparkField.dataType.asInstanceOf[StructType])
+ val imageSparkField = metadataSparkField.fields.find(_.name == "image").get
+ assertEquals(HoodieSchemaType.BLOB.name(),
imageSparkField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertFalse(imageSparkField.nullable)
+ validateBlobFields(imageSparkField.dataType.asInstanceOf[StructType])
+
+ // Spark -> Hudi
+ val reconstructed = HoodieSparkSchemaConverters.toHoodieType(sparkType,
recordName = "document", nameSpace = "test")
+ // Verify the blob type and nullability are preserved in the reconstructed
schema
+ assertTrue(reconstructed.getField("id").isPresent)
+ val metadataField = reconstructed.getField("metadata").get()
+ val thumbnailField = metadataField.schema().getField("thumbnail").get()
+ assertTrue(thumbnailField.schema().isNullable)
+ assertEquals(HoodieSchemaType.BLOB,
thumbnailField.schema().getNonNullType.getType)
+ val imageField = metadataField.schema().getField("image").get()
+ assertFalse(imageField.schema().isNullable)
+ assertEquals(HoodieSchemaType.BLOB, imageField.schema().getType)
+ }
+
+ @Test
+ def testInvalidBlobSchema(): Unit = {
+ // Struct with only 2 fields marked as blob
+ val invalidStruct = new StructType(Array[StructField](
+ StructField(HoodieSchema.Blob.TYPE, DataTypes.StringType, nullable =
false),
+ StructField(HoodieSchema.Blob.INLINE_DATA_FIELD, DataTypes.BinaryType,
nullable = true)
+ ))
+
+ val metadata = new MetadataBuilder()
+ .putString(HoodieSchema.TYPE_METADATA_FIELD,
HoodieSchemaType.BLOB.name())
+ .build()
+
+ val exception = assertThrows(classOf[IllegalArgumentException], () => {
+ HoodieSparkSchemaConverters.toHoodieType(invalidStruct, nullable =
false, metadata = metadata)
+ })
+ assertTrue(exception.getMessage.startsWith("Invalid blob schema
structure"))
+ }
+
+ @Test
+ def testBlobArrayRoundtrip(): Unit = {
+ // Test array containing blobs at various nesting levels
+ val innerSchema = HoodieSchema.createRecord("nested", null, null,
util.Arrays.asList(
+ HoodieSchemaField.of("nested_long",
HoodieSchema.create(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("nested_blob", HoodieSchema.createBlob())))
+ val outerArray = HoodieSchema.createArray(innerSchema)
+
+ val fields = util.Arrays.asList(
+ HoodieSchemaField.of("simple_blobs",
HoodieSchema.createArray(HoodieSchema.createBlob())),
+ HoodieSchemaField.of("simple_nullable_blobs",
HoodieSchema.createArray(HoodieSchema.createNullable(HoodieSchema.createBlob()))),
+ HoodieSchemaField.of("nested_blobs", outerArray)
+ )
+ val originalSchema = HoodieSchema.createRecord("BlobArrays", "test", null,
fields)
+
+ // Roundtrip
+ val (sparkType, _) = HoodieSparkSchemaConverters.toSqlType(originalSchema)
+ val reconstructed = HoodieSparkSchemaConverters.toHoodieType(sparkType,
recordName = "BlobArrays", nameSpace = "test")
+
+ // Verify simple array
+ val simpleField = reconstructed.getField("simple_blobs").get()
+ assertEquals(HoodieSchemaType.ARRAY, simpleField.schema.getType)
+ assertFalse(simpleField.schema.getElementType.isNullable)
+ assertEquals(HoodieSchemaType.BLOB,
simpleField.schema.getElementType.getType)
+
+ // Verify simple nullable array
+ val nullableField = reconstructed.getField("simple_nullable_blobs").get()
+ assertEquals(HoodieSchemaType.ARRAY, nullableField.schema.getType)
+ assertTrue(nullableField.schema.getElementType.isNullable)
+ assertEquals(HoodieSchemaType.BLOB,
nullableField.schema.getElementType.getNonNullType.getType)
+
+ // Verify nested array
+ val nestedField = reconstructed.getField("nested_blobs").get()
+ assertEquals(HoodieSchemaType.ARRAY, nestedField.schema.getType)
+ val nestedArrayType = nestedField.schema.getElementType
+ assertEquals(HoodieSchemaType.RECORD, nestedArrayType.getType)
+ assertEquals(HoodieSchemaType.BLOB,
nestedArrayType.getField("nested_blob").get.schema.getType)
+ }
+
+ @Test
+ def testBlobMapRoundtrip(): Unit = {
+ // Test map containing blobs at various nesting levels
+ val innerSchema = HoodieSchema.createRecord("nested", null, null,
util.Arrays.asList(
+ HoodieSchemaField.of("nested_long",
HoodieSchema.create(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("nested_blob", HoodieSchema.createBlob())))
+ val outerMap = HoodieSchema.createMap(innerSchema)
+
+ val fields = util.Arrays.asList(
+ HoodieSchemaField.of("simple_blobs_map",
HoodieSchema.createMap(HoodieSchema.createBlob())),
+ HoodieSchemaField.of("simple_nullable_blobs_map",
HoodieSchema.createMap(HoodieSchema.createNullable(HoodieSchema.createBlob()))),
+ HoodieSchemaField.of("nested_blobs_map", outerMap)
+ )
+ val originalSchema = HoodieSchema.createRecord("BlobMaps", "test", null,
fields)
+
+ // Roundtrip
+ val (sparkType, _) = HoodieSparkSchemaConverters.toSqlType(originalSchema)
+ val reconstructed = HoodieSparkSchemaConverters.toHoodieType(sparkType,
recordName = "BlobMaps")
+
+ // Verify simple map
+ val simpleField = reconstructed.getField("simple_blobs_map").get()
+ assertEquals(HoodieSchemaType.MAP, simpleField.schema.getType)
+ assertFalse(simpleField.schema.getValueType.isNullable)
+ assertEquals(HoodieSchemaType.BLOB,
simpleField.schema.getValueType.getNonNullType.getType)
+
+ // Verify simple nullable map
+ val nullableField =
reconstructed.getField("simple_nullable_blobs_map").get()
+ assertEquals(HoodieSchemaType.MAP, nullableField.schema.getType)
+ assertTrue(nullableField.schema.getValueType.isNullable)
+ assertEquals(HoodieSchemaType.BLOB,
nullableField.schema.getValueType.getNonNullType.getType)
+
+ // Verify nested map
+ val nestedField = reconstructed.getField("nested_blobs_map").get()
+ assertEquals(HoodieSchemaType.MAP, nestedField.schema.getType)
+ val nestedMapType = nestedField.schema.getValueType
+ assertEquals(HoodieSchemaType.RECORD, nestedMapType.getType)
+ assertEquals(HoodieSchemaType.BLOB,
nestedMapType.getField("nested_blob").get.schema.getType)
+ }
+
+ /**
+ * Validates the content of the blob fields to ensure the fields match our
expectations.
+ * @param dataType the StructType containing the blob fields to validate
+ */
+ private def validateBlobFields(dataType: StructType): Unit = {
+ // storage_type is a non-null string field
+ val storageTypeField = dataType.fields.find(_.name ==
HoodieSchema.Blob.TYPE).get
+ assertEquals(DataTypes.StringType, storageTypeField.dataType)
+ assertFalse(storageTypeField.nullable)
+ // data is a nullable binary field
+ val dataField = dataType.fields.find(_.name ==
HoodieSchema.Blob.INLINE_DATA_FIELD).get
+ assertEquals(DataTypes.BinaryType, dataField.dataType)
+ assertTrue(dataField.nullable)
+ // reference is a nullable struct field
+ val referenceField = dataType.fields.find(_.name ==
HoodieSchema.Blob.EXTERNAL_REFERENCE).get
+ assertEquals(new StructType(Array[StructField](
+ StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
DataTypes.StringType, nullable = false),
+ StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
DataTypes.LongType, nullable = true),
+ StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
DataTypes.LongType, nullable = true),
+ StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
DataTypes.BooleanType, nullable = false)
+ )), referenceField.dataType)
+ assertTrue(referenceField.nullable)
+ }
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestSparkSchemaUtils.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestSparkSchemaUtils.java
index 471c663cb6d4..0026fb71a417 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestSparkSchemaUtils.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestSparkSchemaUtils.java
@@ -47,6 +47,10 @@ import java.util.Arrays;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestSparkSchemaUtils {
private final SparkSqlParser parser = createSqlParser();
@@ -154,4 +158,40 @@ public class TestSparkSchemaUtils {
StructType convertedSparkSchemaWithNullable = (StructType)
StructType.fromJson(sparkSchemaWithNullableJson);
assertEquals(sparkSchemaWithNullable.json(),
convertedSparkSchemaWithNullable.json());
}
+
+ @Test
+ public void testConvertSchemaWithBlobField() {
+ // Tests that schema with BLOB field converts to Spark schema JSON
correctly
+ // Create a simple record with 1 non-BLOB field and 1 BLOB field
+ HoodieSchema schema = HoodieSchema.createRecord("root", null, null, false,
Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT),
null, null),
+ HoodieSchemaField.of("blob_data", HoodieSchema.createBlob(), null,
null)
+ ));
+
+ // Convert to Spark JSON
+ String sparkJson = SparkSchemaUtils.convertToSparkSchemaJson(schema);
+
+ // Validate JSON is valid and parseable
+ assertNotNull(sparkJson);
+ assertFalse(sparkJson.isEmpty());
+
+ StructType sparkSchema = (StructType) StructType.fromJson(sparkJson);
+
+ // Verify basic structure: 2 fields
+ assertEquals(2, sparkSchema.fields().length);
+ assertEquals("id", sparkSchema.fields()[0].name());
+ assertEquals("blob_data", sparkSchema.fields()[1].name());
+
+ // Verify BLOB field converted to struct (not primitive)
+ assertInstanceOf(StructType.class, sparkSchema.fields()[1].dataType());
+
+ // Verify metadata attached to blob field
+ Metadata blobMetadata = sparkSchema.fields()[1].metadata();
+ assertTrue(blobMetadata.contains(HoodieSchema.TYPE_METADATA_FIELD));
+ assertEquals(HoodieSchemaType.BLOB.name(),
blobMetadata.getString(HoodieSchema.TYPE_METADATA_FIELD));
+
+ // Verify BLOB structure has 3 fields (type, data, reference)
+ StructType blobStruct = (StructType) sparkSchema.fields()[1].dataType();
+ assertEquals(3, blobStruct.fields().length);
+ }
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java
index ff14b3a46f87..51198f1ae7c5 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
/**
* Convert the Hoodie schema to spark schema' json string.
@@ -28,10 +29,14 @@ import org.apache.hudi.common.schema.HoodieSchema;
public class SparkSchemaUtils {
public static String convertToSparkSchemaJson(HoodieSchema schema) {
- String fieldsJsonString = schema.getFields().stream().map(field ->
- "{\"name\":\"" + field.name() + "\",\"type\":" +
convertFieldType(field.getNonNullSchema())
- + ",\"nullable\":" + field.isNullable() + ",\"metadata\":{}}")
- .reduce((a, b) -> a + "," + b).orElse("");
+ String fieldsJsonString = schema.getFields().stream().map(field -> {
+ String metadata = "{}";
+ if (field.getNonNullSchema().isBlobField()) {
+ metadata = String.format("{\"%s\":\"%s\"}",
HoodieSchema.TYPE_METADATA_FIELD, HoodieSchemaType.BLOB.name());
+ }
+ return "{\"name\":\"" + field.name() + "\",\"type\":" +
convertFieldType(field.getNonNullSchema())
+ + ",\"nullable\":" + field.isNullable() + ",\"metadata\":" +
metadata + "}";
+ }).reduce((a, b) -> a + "," + b).orElse("");
return "{\"type\":\"struct\",\"fields\":[" + fieldsJsonString + "]}";
}
@@ -81,6 +86,7 @@ public class SparkSchemaUtils {
+ ",\"valueType\":" + convertFieldType(valueType)
+ ",\"valueContainsNull\":" + valueOptional + "}";
case RECORD:
+ case BLOB:
return convertToSparkSchemaJson(fieldSchema);
default:
throw new UnsupportedOperationException("Cannot convert " +
fieldSchema.getType() + " to spark sql type");
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index fa09c8b5eb94..1e0a334c1579 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.HoodieSparkSchemaConverters;
+import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -110,7 +111,7 @@ public class TestSourceFormatAdapter {
TypedProperties typedProperties = new TypedProperties();
typedProperties.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(),
true);
typedProperties.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),
"__");
- setupJsonSource(rdd,
HoodieSparkSchemaConverters.toHoodieType(sanitizedSchema, false, "record", ""));
+ setupJsonSource(rdd,
HoodieSparkSchemaConverters.toHoodieType(sanitizedSchema, false, "record", "",
Metadata.empty()));
SourceFormatAdapter sourceFormatAdapter = new
SourceFormatAdapter(testJsonDataSource, Option.empty(),
Option.of(typedProperties));
return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(new
StreamerCheckpointV2(DUMMY_CHECKPOINT)), 10L);
}
diff --git a/rfc/rfc-100/rfc-100.md b/rfc/rfc-100/rfc-100.md
index 5c129d763727..e66ab97c57a4 100644
--- a/rfc/rfc-100/rfc-100.md
+++ b/rfc/rfc-100/rfc-100.md
@@ -116,10 +116,6 @@ We will add a `blob` type to the HoodieSchema that
encapsulates both inline and
```
The `managed` flag will be used by the cleaner to determine if an out-of-line
blob should be deleted when cleaning up old file slices. This allows users to
point to existing files without Hudi deleting them.
-#### Restrictions
-We will not support adding blobs as Map values or Array elements in the
initial implementation to reduce the complexity of the implementation for
reading and managing blob references.
-Blobs can still be nested within Structs/Records to allow for complex schemas.
-
### 2. Reader
Readers will be updated to allow for lazy loading of the blob data, even when
it is inline. This will help reduce memory pressure during shuffles in
distributed query engines like Spark.
The readers will return a reference to the blob data in the form of a path,
position, and size. This applies for both inline and out-of-line storage.