voonhous commented on code in PR #17751:
URL: https://github.com/apache/hudi/pull/17751#discussion_r2719623369
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1343,6 +1480,263 @@ public enum TimePrecision {
MICROS
}
+ /**
+ * Custom Avro LogicalType for Variant.
+ * This logical type is applied to RECORD schemas that represent Variant
types.
+ *
+ * <p>This is a singleton type - use {@link #variant()} to get the
instance.</p>
+ */
+ public static class VariantLogicalType extends LogicalType {
+
+ private static final String VARIANT_LOGICAL_TYPE_NAME = "variant";
+ // Eager initialization of singleton
+ private static final VariantLogicalType INSTANCE = new
VariantLogicalType();
+
+ private VariantLogicalType() {
+ super(VariantLogicalType.VARIANT_LOGICAL_TYPE_NAME);
+ }
+
+ /**
+ * Returns the singleton instance of VariantLogicalType.
+ *
+ * @return the Variant logical type instance
+ */
+ public static VariantLogicalType variant() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ if (schema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException("Variant logical type can only be
applied to RECORD schemas, got: " + schema.getType());
+ }
+ }
+ }
+
+ /**
+ * Factory for creating VariantLogicalType instances.
+ */
+ private static class VariantLogicalTypeFactory implements
LogicalTypes.LogicalTypeFactory {
+ @Override
+ public LogicalType fromSchema(Schema schema) {
+ return VariantLogicalType.variant();
+ }
+
+ @Override
+ public String getTypeName() {
+ return VariantLogicalType.VARIANT_LOGICAL_TYPE_NAME;
+ }
+ }
+
+ /**
+ * Variant schema type representing semi-structured data that can store
values of different types.
+ *
+ * <p>According to the Parquet specification, a Variant is represented as a
record/group with binary fields:
+ * <ul>
+ * <li>metadata: Binary field containing the Variant metadata component
(always required)</li>
+ * <li>value: Binary field containing the Variant value component
(required for unshredded, optional for shredded)</li>
+ * <li>typed_value: Optional field for shredded variants, stores values
matching a specific type (type varies)</li>
+ * </ul>
+ * </p>
+ *
+ * <p>This implementation supports both:</p>
+ * <ul>
+ * <li><b>Unshredded Variant</b>: metadata (required) and value (required)
fields.</li>
+ * <li><b>Shredded Variant</b>: metadata (required), value (optional), and
typed_value (optional).</li>
+ * </ul>
+ *
+ * <p>Backwards compatibility:</p>
+ * <ul>
+ * <li>Old Hudi versions will read it as a regular record with byte array
fields</li>
+ * <li>New Hudi versions can detect it as a Variant type via the Avro
LogicalType mechanism</li>
+ * </ul>
+ */
+ public static class Variant extends HoodieSchema {
+
+ private static final String VARIANT_DEFAULT_NAME = "variant";
+ private static final String VARIANT_METADATA_FIELD = "metadata";
+ private static final String VARIANT_VALUE_FIELD = "value";
+ private static final String VARIANT_TYPED_VALUE_FIELD = "typed_value";
+
+ private final boolean isShredded;
+ private final Option<HoodieSchema> typedValueSchema;
+
+ /**
+ * Creates a new Variant HoodieSchema wrapping the given Avro schema.
+ *
+ * @param avroSchema the Avro schema to wrap, must be a valid Variant
schema
+ * @throws IllegalArgumentException if avroSchema is null or not a valid
Variant schema
+ */
+ private Variant(Schema avroSchema) {
+ super(avroSchema);
+ this.isShredded = determineIfShredded(avroSchema);
+ this.typedValueSchema = extractTypedValueSchema(avroSchema);
+ validateVariantSchema(avroSchema);
+ }
+
+ /**
+ * Determines if the variant schema is shredded based on the value field
nullability or presence of typed_value.
+ *
+ * @param avroSchema the schema to check
+ * @return true if the value field is nullable or typed_value exists
(shredded), false otherwise (unshredded)
+ */
+ private boolean determineIfShredded(Schema avroSchema) {
+ // Check if typed_value field exists
+ Schema.Field typedValueField =
avroSchema.getField(VARIANT_TYPED_VALUE_FIELD);
+ if (typedValueField != null) {
+ return true;
+ }
+
+ // Check if value field is nullable
+ Schema.Field valueField = avroSchema.getField(VARIANT_VALUE_FIELD);
+ if (valueField == null) {
+ return false;
+ }
+ Schema valueSchema = valueField.schema();
+ if (valueSchema.getType() == Schema.Type.UNION) {
+ return valueSchema.getTypes().stream().anyMatch(s -> s.getType() ==
Schema.Type.NULL);
+ }
+ return false;
+ }
+
+ /**
+ * Extracts the typed_value field schema if present.
+ *
+ * @param avroSchema the schema to extract from
+ * @return Option containing the typed_value schema, or Option.empty() if
not present
+ */
+ private Option<HoodieSchema> extractTypedValueSchema(Schema avroSchema) {
+ Schema.Field typedValueField =
avroSchema.getField(VARIANT_TYPED_VALUE_FIELD);
+ if (typedValueField != null) {
+ return
Option.of(HoodieSchema.fromAvroSchema(typedValueField.schema()));
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Validates that the given Avro schema conforms to the Variant
specification.
+ *
+ * @param avroSchema the schema to validate
+ * @throws IllegalArgumentException if the schema is not a valid Variant
schema
+ */
+ private void validateVariantSchema(Schema avroSchema) {
+ if (avroSchema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException("Variant schema must be a RECORD
type, got: " + avroSchema.getType());
+ }
+
+ // Check for metadata field (always required)
+ Schema.Field metadataField = avroSchema.getField(VARIANT_METADATA_FIELD);
+ if (metadataField == null) {
+ throw new IllegalArgumentException("Variant schema must have a '" +
VARIANT_METADATA_FIELD + "' field");
+ }
+ if (metadataField.schema().getType() != Schema.Type.BYTES) {
+ throw new IllegalArgumentException("Variant metadata field must be
BYTES type, got: " + metadataField.schema().getType());
+ }
+
+ // Check for value field
+ Schema.Field valueField = avroSchema.getField(VARIANT_VALUE_FIELD);
+ if (valueField == null) {
+ throw new IllegalArgumentException("Variant schema must have a '" +
VARIANT_VALUE_FIELD + "' field");
+ }
+
+ Schema valueSchema = valueField.schema();
+ if (isShredded) {
+ // Shredded: value should be nullable (union with null and bytes)
+ if (valueSchema.getType() == Schema.Type.UNION) {
+ boolean hasNull = valueSchema.getTypes().stream().anyMatch(s ->
s.getType() == Schema.Type.NULL);
+ boolean hasBytes = valueSchema.getTypes().stream().anyMatch(s ->
s.getType() == Schema.Type.BYTES);
+ if (!hasNull || !hasBytes) {
+ throw new IllegalArgumentException("Shredded Variant value field
should be a union of [null, bytes]");
+ }
+ } else if (valueSchema.getType() != Schema.Type.BYTES) {
+ // If not a union, it should at least be bytes (some shredded
variants may have non-null value)
+ throw new IllegalArgumentException("Shredded Variant value field
must be BYTES or nullable BYTES, got: " + valueSchema.getType());
+ }
+ } else {
+ // Unshredded: value must be non-nullable bytes
+ if (valueSchema.getType() != Schema.Type.BYTES) {
+ throw new IllegalArgumentException("Unshredded Variant value field
must be BYTES type, got: " + valueSchema.getType());
+ }
+ }
+ }
+
+ /**
+ * Checks if the given Avro schema is a Variant schema.
+ * This checks for the Variant logical type.
+ *
+ * @param avroSchema the schema to check
+ * @return true if the schema has a Variant logical type
+ */
+ public static boolean isVariantSchema(Schema avroSchema) {
Review Comment:
Pre-emptively including this as i thought we'd need this for type
comparison. I'm okay to move this to the `TestHoodieSchema` or a similar test
scope for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]