the-other-tim-brown commented on code in PR #17751:
URL: https://github.com/apache/hudi/pull/17751#discussion_r2718251071


##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1343,6 +1480,263 @@ public enum TimePrecision {
     MICROS
   }
 
+  /**
+   * Custom Avro LogicalType for Variant.
+   * This logical type is applied to RECORD schemas that represent Variant 
types.
+   *
+   * <p>This is a singleton type - use {@link #variant()} to get the 
instance.</p>
+   */
+  public static class VariantLogicalType extends LogicalType {
+
+    private static final String VARIANT_LOGICAL_TYPE_NAME = "variant";
+    // Eager initialization of singleton
+    private static final VariantLogicalType INSTANCE = new 
VariantLogicalType();
+
+    private VariantLogicalType() {
+      super(VariantLogicalType.VARIANT_LOGICAL_TYPE_NAME);
+    }
+
+    /**
+     * Returns the singleton instance of VariantLogicalType.
+     *
+     * @return the Variant logical type instance
+     */
+    public static VariantLogicalType variant() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getType() != Schema.Type.RECORD) {
+        throw new IllegalArgumentException("Variant logical type can only be 
applied to RECORD schemas, got: " + schema.getType());
+      }
+    }
+  }
+
+  /**
+   * Factory for creating VariantLogicalType instances.
+   */
+  private static class VariantLogicalTypeFactory implements 
LogicalTypes.LogicalTypeFactory {
+    @Override
+    public LogicalType fromSchema(Schema schema) {
+      return VariantLogicalType.variant();
+    }
+
+    @Override
+    public String getTypeName() {
+      return VariantLogicalType.VARIANT_LOGICAL_TYPE_NAME;
+    }
+  }
+
+  /**
+   * Variant schema type representing semi-structured data that can store 
values of different types.
+   *
+   * <p>According to the Parquet specification, a Variant is represented as a 
record/group with binary fields:
+   * <ul>
+   *   <li>metadata: Binary field containing the Variant metadata component 
(always required)</li>
+   *   <li>value: Binary field containing the Variant value component 
(required for unshredded, optional for shredded)</li>
+   *   <li>typed_value: Optional field for shredded variants, stores values 
matching a specific type (type varies)</li>
+   * </ul>
+   * </p>
+   *
+   * <p>This implementation supports both:</p>
+   * <ul>
+   *   <li><b>Unshredded Variant</b>: metadata (required) and value (required) 
fields.</li>
+   *   <li><b>Shredded Variant</b>: metadata (required), value (optional), and 
typed_value (optional).</li>
+   * </ul>
+   *
+   * <p>Backwards compatibility:</p>
+   * <ul>
+   *   <li>Old Hudi versions will read it as a regular record with byte array 
fields</li>
+   *   <li>New Hudi versions can detect it as a Variant type via the Avro 
LogicalType mechanism</li>
+   * </ul>
+   */
+  public static class Variant extends HoodieSchema {
+
+    private static final String VARIANT_DEFAULT_NAME = "variant";
+    private static final String VARIANT_METADATA_FIELD = "metadata";
+    private static final String VARIANT_VALUE_FIELD = "value";
+    private static final String VARIANT_TYPED_VALUE_FIELD = "typed_value";
+
+    private final boolean isShredded;
+    private final Option<HoodieSchema> typedValueSchema;
+
+    /**
+     * Creates a new Variant HoodieSchema wrapping the given Avro schema.
+     *
+     * @param avroSchema the Avro schema to wrap, must be a valid Variant 
schema
+     * @throws IllegalArgumentException if avroSchema is null or not a valid 
Variant schema
+     */
+    private Variant(Schema avroSchema) {
+      super(avroSchema);
+      this.isShredded = determineIfShredded(avroSchema);
+      this.typedValueSchema = extractTypedValueSchema(avroSchema);
+      validateVariantSchema(avroSchema);
+    }
+
+    /**
+     * Determines if the variant schema is shredded based on the value field 
nullability or presence of typed_value.
+     *
+     * @param avroSchema the schema to check
+     * @return true if the value field is nullable or typed_value exists 
(shredded), false otherwise (unshredded)
+     */
+    private boolean determineIfShredded(Schema avroSchema) {
+      // Check if typed_value field exists
+      Schema.Field typedValueField = 
avroSchema.getField(VARIANT_TYPED_VALUE_FIELD);
+      if (typedValueField != null) {
+        return true;
+      }
+
+      // Check if value field is nullable
+      Schema.Field valueField = avroSchema.getField(VARIANT_VALUE_FIELD);
+      if (valueField == null) {
+        return false;
+      }
+      Schema valueSchema = valueField.schema();
+      if (valueSchema.getType() == Schema.Type.UNION) {
+        return valueSchema.getTypes().stream().anyMatch(s -> s.getType() == 
Schema.Type.NULL);
+      }
+      return false;
+    }
+
+    /**
+     * Extracts the typed_value field schema if present.
+     *
+     * @param avroSchema the schema to extract from
+     * @return Option containing the typed_value schema, or Option.empty() if 
not present
+     */
+    private Option<HoodieSchema> extractTypedValueSchema(Schema avroSchema) {
+      Schema.Field typedValueField = 
avroSchema.getField(VARIANT_TYPED_VALUE_FIELD);
+      if (typedValueField != null) {
+        return 
Option.of(HoodieSchema.fromAvroSchema(typedValueField.schema()));
+      }
+      return Option.empty();
+    }
+
+    /**
+     * Validates that the given Avro schema conforms to the Variant 
specification.
+     *
+     * @param avroSchema the schema to validate
+     * @throws IllegalArgumentException if the schema is not a valid Variant 
schema
+     */
+    private void validateVariantSchema(Schema avroSchema) {
+      if (avroSchema.getType() != Schema.Type.RECORD) {
+        throw new IllegalArgumentException("Variant schema must be a RECORD 
type, got: " + avroSchema.getType());
+      }
+
+      // Check for metadata field (always required)
+      Schema.Field metadataField = avroSchema.getField(VARIANT_METADATA_FIELD);
+      if (metadataField == null) {
+        throw new IllegalArgumentException("Variant schema must have a '" + 
VARIANT_METADATA_FIELD + "' field");
+      }
+      if (metadataField.schema().getType() != Schema.Type.BYTES) {
+        throw new IllegalArgumentException("Variant metadata field must be 
BYTES type, got: " + metadataField.schema().getType());
+      }
+
+      // Check for value field
+      Schema.Field valueField = avroSchema.getField(VARIANT_VALUE_FIELD);
+      if (valueField == null) {
+        throw new IllegalArgumentException("Variant schema must have a '" + 
VARIANT_VALUE_FIELD + "' field");
+      }
+
+      Schema valueSchema = valueField.schema();
+      if (isShredded) {
+        // Shredded: value should be nullable (union with null and bytes)
+        if (valueSchema.getType() == Schema.Type.UNION) {
+          boolean hasNull = valueSchema.getTypes().stream().anyMatch(s -> 
s.getType() == Schema.Type.NULL);
+          boolean hasBytes = valueSchema.getTypes().stream().anyMatch(s -> 
s.getType() == Schema.Type.BYTES);
+          if (!hasNull || !hasBytes) {
+            throw new IllegalArgumentException("Shredded Variant value field 
should be a union of [null, bytes]");
+          }
+        } else if (valueSchema.getType() != Schema.Type.BYTES) {
+          // If not a union, it should at least be bytes (some shredded 
variants may have non-null value)
+          throw new IllegalArgumentException("Shredded Variant value field 
must be BYTES or nullable BYTES, got: " + valueSchema.getType());
+        }
+      } else {
+        // Unshredded: value must be non-nullable bytes
+        if (valueSchema.getType() != Schema.Type.BYTES) {
+          throw new IllegalArgumentException("Unshredded Variant value field 
must be BYTES type, got: " + valueSchema.getType());
+        }
+      }
+    }
+
+    /**
+     * Checks if the given Avro schema is a Variant schema.
+     * This checks for the Variant logical type.
+     *
+     * @param avroSchema the schema to check
+     * @return true if the schema has a Variant logical type
+     */
+    public static boolean isVariantSchema(Schema avroSchema) {

Review Comment:
   This is only used in tests so far, do we still need this?



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -921,6 +1055,9 @@ public HoodieSchema parse(InputStream inputStream) {
         return fromAvroSchema(avroSchema);
       } catch (IOException e) {
         throw new HoodieIOException("Failed to parse schema from InputStream", 
e);
+      } catch (IllegalArgumentException e) {
+        // Wrap validation exceptions to preserve the detailed error message
+        throw new HoodieAvroSchemaException(e.getMessage(), e);

Review Comment:
   This will duplicate the error message in the stacktrace. Can we add a more 
meaningful message here?
   
   Same applies to the change above



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1343,6 +1480,263 @@ public enum TimePrecision {
     MICROS
   }
 
+  /**
+   * Custom Avro LogicalType for Variant.
+   * This logical type is applied to RECORD schemas that represent Variant 
types.
+   *
+   * <p>This is a singleton type - use {@link #variant()} to get the 
instance.</p>
+   */
+  public static class VariantLogicalType extends LogicalType {

Review Comment:
   Can we make this package private?



##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java:
##########
@@ -1042,4 +1044,526 @@ public void 
testParseLogicalTypesCreatesCorrectSubclasses() {
     HoodieSchema.Time tmMicros = (HoodieSchema.Time) parsedTimeMicros;
     assertEquals(HoodieSchema.TimePrecision.MICROS, tmMicros.getPrecision());
   }
+
+  // ==================== Variant Type Tests ====================
+
+  @Test
+  public void testCreateUnshreddedVariant() {
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+
+    assertNotNull(variantSchema);
+    assertInstanceOf(HoodieSchema.Variant.class, variantSchema);
+    assertEquals(HoodieSchemaType.VARIANT, variantSchema.getType());
+    assertEquals("variant", variantSchema.getName());
+    assertFalse(variantSchema.isShredded());
+    assertFalse(variantSchema.getTypedValueField().isPresent());
+
+    // Verify fields
+    List<HoodieSchemaField> fields = variantSchema.getFields();
+    assertEquals(2, fields.size());
+    assertEquals("metadata", fields.get(0).name());
+    assertEquals("value", fields.get(1).name());
+
+    // Verify field types
+    assertEquals(HoodieSchemaType.BYTES, fields.get(0).schema().getType());
+    assertEquals(HoodieSchemaType.BYTES, fields.get(1).schema().getType());
+
+    // Value field should be non-nullable for unshredded
+    assertFalse(fields.get(1).schema().isNullable());
+  }
+
+  @Test
+  public void testCreateUnshreddedVariantWithCustomName() {
+    final String name = "my_variant";
+    final String namespace = "org.apache.hoodie";
+    final String doc = "Custom variant schema";
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant(name, 
namespace, doc);
+
+    assertNotNull(variantSchema);
+    assertEquals(name, variantSchema.getAvroSchema().getName());
+    assertEquals(namespace, variantSchema.getAvroSchema().getNamespace());
+    assertEquals(doc, variantSchema.getAvroSchema().getDoc());
+    assertFalse(variantSchema.isShredded());
+  }
+
+  @Test
+  public void testCreateShreddedVariant() {
+    final String recordName = "TypedValue";
+    final String fieldName = "data";
+    HoodieSchema typedValueSchema = HoodieSchema.createRecord(recordName, 
null, null,
+        Collections.singletonList(HoodieSchemaField.of(fieldName, 
HoodieSchema.create(HoodieSchemaType.STRING))));
+
+    HoodieSchema.Variant variantSchema = 
HoodieSchema.createVariantShredded(typedValueSchema);
+
+    assertNotNull(variantSchema);
+    assertInstanceOf(HoodieSchema.Variant.class, variantSchema);
+    assertEquals(HoodieSchemaType.VARIANT, variantSchema.getType());
+    assertTrue(variantSchema.isShredded());
+    assertTrue(variantSchema.getTypedValueField().isPresent());
+
+    // Verify fields
+    List<HoodieSchemaField> fields = variantSchema.getFields();
+    assertEquals(3, fields.size());
+    assertEquals("metadata", fields.get(0).name());
+    assertEquals("value", fields.get(1).name());
+    assertEquals("typed_value", fields.get(2).name());
+
+    // Value field should be nullable for shredded
+    assertTrue(fields.get(1).schema().isNullable());
+
+    // Verify typed_value schema
+    HoodieSchema retrievedTypedValueSchema = 
variantSchema.getTypedValueField().get();
+    assertEquals(HoodieSchemaType.RECORD, retrievedTypedValueSchema.getType());
+  }
+
+  @Test
+  public void testCreateShreddedVariantWithoutTypedValue() {
+    HoodieSchema.Variant variantSchema = 
HoodieSchema.createVariantShredded(null);
+
+    assertNotNull(variantSchema);
+    assertTrue(variantSchema.isShredded());
+    assertFalse(variantSchema.getTypedValueField().isPresent());
+
+    // Verify fields should have metadata and nullable value, but no 
typed_value
+    List<HoodieSchemaField> fields = variantSchema.getFields();
+    assertEquals(2, fields.size());
+    assertEquals("metadata", fields.get(0).name());
+    assertEquals("value", fields.get(1).name());
+
+    // Value field should be nullable even without typed_value
+    assertTrue(fields.get(1).schema().isNullable());
+  }
+
+  @Test
+  public void testVariantLogicalTypeDetection() {
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+    Schema avroSchema = variantSchema.getAvroSchema();
+
+    // Verify logical type is set
+    assertNotNull(avroSchema.getLogicalType());
+    assertInstanceOf(HoodieSchema.VariantLogicalType.class, 
avroSchema.getLogicalType());
+    assertEquals("variant", avroSchema.getLogicalType().getName());
+
+    // Verify isVariantSchema detection
+    assertTrue(HoodieSchema.Variant.isVariantSchema(avroSchema));
+  }
+
+  @Test
+  public void testVariantRoundTripSerializationToJson() {
+    // Create unshredded variant
+    HoodieSchema.Variant originalVariant = HoodieSchema.createVariant();
+    String jsonSchema = originalVariant.toString();
+
+    // Parse back from JSON
+    HoodieSchema parsedSchema = HoodieSchema.parse(jsonSchema);
+
+    // Verify it's still detected as Variant
+    assertInstanceOf(HoodieSchema.Variant.class, parsedSchema);
+    HoodieSchema.Variant parsedVariant = (HoodieSchema.Variant) parsedSchema;
+    assertFalse(parsedVariant.isShredded());
+    assertEquals(HoodieSchemaType.VARIANT, parsedVariant.getType());
+
+    // Verify logical type is preserved
+    
assertTrue(HoodieSchema.Variant.isVariantSchema(parsedVariant.getAvroSchema()));
+  }
+
+  @Test
+  public void testVariantShreddedRoundTripSerializationToJson() {
+    // Create shredded variant with typed_value
+    HoodieSchema typedValueSchema = 
HoodieSchema.create(HoodieSchemaType.STRING);
+    HoodieSchema.Variant originalVariant = 
HoodieSchema.createVariantShredded(typedValueSchema);
+    String jsonSchema = originalVariant.toString();
+
+    // Parse back from JSON
+    HoodieSchema parsedSchema = HoodieSchema.parse(jsonSchema);
+
+    // Verify it's still detected as Variant
+    assertInstanceOf(HoodieSchema.Variant.class, parsedSchema);
+    HoodieSchema.Variant parsedVariant = (HoodieSchema.Variant) parsedSchema;
+    assertTrue(parsedVariant.isShredded());
+    assertTrue(parsedVariant.getTypedValueField().isPresent());
+    assertEquals(HoodieSchemaType.STRING, 
parsedVariant.getTypedValueField().get().getType());
+  }
+
+  @Test
+  public void testVariantBackwardsCompatibility() {
+    // Create a variant schema
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+    String jsonSchema = variantSchema.toString();
+
+    // Simulate old Hudi version by parsing without logical type support
+    // Old versions will see it as a regular record with byte array fields
+    Schema avroSchema = new Schema.Parser().parse(jsonSchema);
+
+    // Verify it can be read as a regular record
+    assertEquals(Schema.Type.RECORD, avroSchema.getType());
+    assertEquals(2, avroSchema.getFields().size());
+
+    // Verify fields are readable as bytes
+    Schema.Field metadataField = avroSchema.getField("metadata");
+    assertNotNull(metadataField);
+    assertEquals(Schema.Type.BYTES, metadataField.schema().getType());
+
+    Schema.Field valueField = avroSchema.getField("value");
+    assertNotNull(valueField);
+    assertEquals(Schema.Type.BYTES, valueField.schema().getType());
+  }
+
+  @Test
+  public void testVariantTypeInHoodieSchemaType() {
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+
+    // Verify type detection through HoodieSchemaType
+    HoodieSchemaType type = 
HoodieSchemaType.fromAvro(variantSchema.getAvroSchema());
+    assertEquals(HoodieSchemaType.VARIANT, type);
+    assertTrue(type.isComplex());
+  }
+
+  @Test
+  public void testVariantInNestedStructures() {
+    // Test Variant as field in a record
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+    HoodieSchemaField variantField = HoodieSchemaField.of("data", 
variantSchema, "Variant data field", null);
+    List<HoodieSchemaField> fields = Arrays.asList(
+        HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
+        variantField
+    );
+
+    HoodieSchema recordSchema = HoodieSchema.createRecord("RecordWithVariant", 
null, null, fields);
+
+    // Retrieve and verify the variant field
+    Option<HoodieSchemaField> retrievedField = recordSchema.getField("data");
+    assertTrue(retrievedField.isPresent());
+    assertInstanceOf(HoodieSchema.Variant.class, 
retrievedField.get().schema());
+
+    // Test Variant in array
+    HoodieSchema arrayOfVariants = HoodieSchema.createArray(variantSchema);
+    assertEquals(HoodieSchemaType.ARRAY, arrayOfVariants.getType());
+    assertInstanceOf(HoodieSchema.Variant.class, 
arrayOfVariants.getElementType());
+
+    // Test Variant in map
+    HoodieSchema mapOfVariants = HoodieSchema.createMap(variantSchema);
+    assertEquals(HoodieSchemaType.MAP, mapOfVariants.getType());
+    assertInstanceOf(HoodieSchema.Variant.class, mapOfVariants.getValueType());
+  }
+
+  @Test
+  public void testVariantFieldAccess() {
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+
+    // Test metadata field access
+    HoodieSchema metadataFieldSchema = variantSchema.getMetadataField();
+    assertNotNull(metadataFieldSchema);
+    assertEquals(HoodieSchemaType.BYTES, metadataFieldSchema.getType());
+
+    // Test value field access
+    HoodieSchema valueFieldSchema = variantSchema.getValueField();
+    assertNotNull(valueFieldSchema);
+    assertEquals(HoodieSchemaType.BYTES, valueFieldSchema.getType());
+
+    // Test typed_value field access (should be empty for unshredded)
+    assertFalse(variantSchema.getTypedValueField().isPresent());
+  }
+
+  @Test
+  public void testVariantSerialization() throws Exception {
+    HoodieSchema.Variant originalVariant = HoodieSchema.createVariant();
+
+    // Serialize
+    ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+    try (ObjectOutputStream out = new ObjectOutputStream(byteOut)) {
+      out.writeObject(originalVariant);
+    }
+    byte[] bytesWritten = byteOut.toByteArray();
+
+    // Deserialize
+    HoodieSchema deserializedSchema;
+    try (ObjectInputStream in = new ObjectInputStream(new 
ByteArrayInputStream(bytesWritten))) {
+      deserializedSchema = (HoodieSchema) in.readObject();
+    }
+
+    assertEquals(originalVariant, deserializedSchema);
+    assertInstanceOf(HoodieSchema.Variant.class, deserializedSchema);
+    HoodieSchema.Variant deserializedVariant = (HoodieSchema.Variant) 
deserializedSchema;
+    assertFalse(deserializedVariant.isShredded());
+    assertEquals(HoodieSchemaType.VARIANT, deserializedVariant.getType());
+  }
+
+  @Test
+  public void testVariantShreddedSerialization() throws Exception {
+    HoodieSchema typedValueSchema = HoodieSchema.create(HoodieSchemaType.INT);
+    HoodieSchema.Variant originalVariant = 
HoodieSchema.createVariantShredded(typedValueSchema);
+
+    // Serialize
+    ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+    try (ObjectOutputStream out = new ObjectOutputStream(byteOut)) {
+      out.writeObject(originalVariant);
+    }
+    byte[] bytesWritten = byteOut.toByteArray();
+
+    // Deserialize
+    HoodieSchema deserializedSchema;
+    try (ObjectInputStream in = new ObjectInputStream(new 
ByteArrayInputStream(bytesWritten))) {
+      deserializedSchema = (HoodieSchema) in.readObject();
+    }
+
+    assertEquals(originalVariant, deserializedSchema);
+    assertInstanceOf(HoodieSchema.Variant.class, deserializedSchema);
+    HoodieSchema.Variant deserializedVariant = (HoodieSchema.Variant) 
deserializedSchema;
+    assertTrue(deserializedVariant.isShredded());
+    assertTrue(deserializedVariant.getTypedValueField().isPresent());
+    assertEquals(HoodieSchemaType.INT, 
deserializedVariant.getTypedValueField().get().getType());
+  }
+
+  @Test
+  public void testVariantEquality() {
+    HoodieSchema.Variant variant1 = HoodieSchema.createVariant();
+    HoodieSchema.Variant variant2 = HoodieSchema.createVariant();
+
+    assertEquals(variant1, variant2);
+    assertEquals(variant1.hashCode(), variant2.hashCode());
+
+    // Different shredding status should result in different schemas
+    HoodieSchema.Variant shreddedVariant = 
HoodieSchema.createVariantShredded(null);
+    // Note: equality depends on underlying Avro schema, which will be 
different due to nullable value field in shredded variant
+    assertNotEquals(shreddedVariant, variant1);
+    // Not really required since variant1 == variant2 is established
+    assertNotEquals(shreddedVariant, variant2); 
+  }
+
+  @Test
+  public void testVariantIsNotDetectedForRegularRecord() {
+    // Create a regular record without variant logical type
+    HoodieSchemaField metadataField = HoodieSchemaField.of("metadata", 
HoodieSchema.create(HoodieSchemaType.BYTES));
+    HoodieSchemaField valueField = HoodieSchemaField.of("value", 
HoodieSchema.create(HoodieSchemaType.BYTES));
+    List<HoodieSchemaField> fields = Arrays.asList(metadataField, valueField);
+
+    HoodieSchema regularRecord = HoodieSchema.createRecord("NotAVariant", 
null, null, fields);
+
+    // Should not be detected as Variant
+    
assertFalse(HoodieSchema.Variant.isVariantSchema(regularRecord.getAvroSchema()));
+    assertEquals(HoodieSchemaType.RECORD, regularRecord.getType());
+    assertFalse(regularRecord instanceof HoodieSchema.Variant);
+  }
+
+  @Test
+  public void testParseVariantFromJsonWithLogicalType() {
+    // JSON representation of a Variant schema
+    String variantJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"variant\","
+        + "\"logicalType\":\"variant\","
+        + "\"fields\":["
+        + "  {\"name\":\"metadata\",\"type\":\"bytes\"},"
+        + "  {\"name\":\"value\",\"type\":\"bytes\"}"
+        + "]"
+        + "}";
+
+    HoodieSchema parsedSchema = HoodieSchema.parse(variantJson);
+
+    assertInstanceOf(HoodieSchema.Variant.class, parsedSchema);
+    HoodieSchema.Variant parsedVariant = (HoodieSchema.Variant) parsedSchema;
+    assertFalse(parsedVariant.isShredded());
+    assertEquals(HoodieSchemaType.VARIANT, parsedVariant.getType());
+  }
+
+  @Test
+  public void testInvalidVariantMissingMetadataField() {
+    // Create a record without metadata field
+    String invalidVariantJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"variant\","
+        + "\"logicalType\":\"variant\","
+        + "\"fields\":["
+        + "  {\"name\":\"value\",\"type\":\"bytes\"}"
+        + "]"
+        + "}";
+
+    HoodieAvroSchemaException exception = 
assertThrows(HoodieAvroSchemaException.class,
+        () -> HoodieSchema.parse(invalidVariantJson));
+    assertTrue(exception.getMessage().contains("metadata"));
+  }
+
+  @Test
+  public void testInvalidVariantMissingValueField() {
+    // Create a record without value field
+    String invalidVariantJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"variant\","
+        + "\"logicalType\":\"variant\","
+        + "\"fields\":["
+        + "  {\"name\":\"metadata\",\"type\":\"bytes\"}"
+        + "]"
+        + "}";
+
+    HoodieAvroSchemaException exception = 
assertThrows(HoodieAvroSchemaException.class,
+        () -> HoodieSchema.parse(invalidVariantJson));
+    assertTrue(exception.getMessage().contains("value"));
+  }
+
+  @Test
+  public void testInvalidVariantWrongMetadataFieldType() {
+    // Create a variant with metadata as STRING instead of BYTES
+    String invalidVariantJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"variant\","
+        + "\"logicalType\":\"variant\","
+        + "\"fields\":["
+        + "  {\"name\":\"metadata\",\"type\":\"string\"},"
+        + "  {\"name\":\"value\",\"type\":\"bytes\"}"
+        + "]"
+        + "}";
+
+    HoodieAvroSchemaException exception = 
assertThrows(HoodieAvroSchemaException.class,
+        () -> HoodieSchema.parse(invalidVariantJson));
+    assertTrue(exception.getMessage().contains("metadata") && 
exception.getMessage().contains("BYTES"));
+  }
+
+  @Test
+  public void testInvalidVariantWrongValueFieldType() {
+    // Create a variant with value as STRING instead of BYTES
+    String invalidVariantJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"variant\","
+        + "\"logicalType\":\"variant\","
+        + "\"fields\":["
+        + "  {\"name\":\"metadata\",\"type\":\"bytes\"},"
+        + "  {\"name\":\"value\",\"type\":\"string\"}"
+        + "]"
+        + "}";
+
+    HoodieAvroSchemaException exception = 
assertThrows(HoodieAvroSchemaException.class,
+        () -> HoodieSchema.parse(invalidVariantJson));
+    assertTrue(exception.getMessage().contains("value") && 
exception.getMessage().contains("BYTES"));
+  }
+
+  @Test
+  public void testVariantHelperMethods() {
+    // Test hasFields() helper methods
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+
+    assertTrue(variantSchema.hasFields());
+    assertEquals(HoodieSchemaType.VARIANT, variantSchema.getType());
+
+    // Test that non-variant schemas return false
+    HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
+    assertFalse(stringSchema.hasFields());
+  }
+
+  @Test
+  public void testVariantFieldAccessMethods() {
+    // Test that Variant can access its fields via getFields()
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+
+    List<HoodieSchemaField> fields = variantSchema.getFields();
+    assertEquals(2, fields.size());
+
+    // Test getField()
+    Option<HoodieSchemaField> metadataField = 
variantSchema.getField("metadata");
+    assertTrue(metadataField.isPresent());
+    assertEquals("metadata", metadataField.get().name());
+
+    Option<HoodieSchemaField> valueField = variantSchema.getField("value");
+    assertTrue(valueField.isPresent());
+    assertEquals("value", valueField.get().name());
+  }
+
+  @Test
+  public void testVariantWithComplexTypedValue() {
+    // Create a shredded variant with a complex record as typed_value
+    List<HoodieSchemaField> recordFields = Arrays.asList(
+        HoodieSchemaField.of("name", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of("age", HoodieSchema.create(HoodieSchemaType.INT))
+    );
+    HoodieSchema complexSchema = HoodieSchema.createRecord("Person", null, 
null, recordFields);
+
+    HoodieSchema.Variant shreddedVariant = 
HoodieSchema.createVariantShredded(complexSchema);
+
+    assertTrue(shreddedVariant.isShredded());
+    assertTrue(shreddedVariant.getTypedValueField().isPresent());
+    assertEquals(HoodieSchemaType.RECORD, 
shreddedVariant.getTypedValueField().get().getType());
+
+    // Verify it can be serialized and parsed back
+    String schemaJson = shreddedVariant.toString();
+    HoodieSchema parsedSchema = HoodieSchema.parse(schemaJson);
+
+    assertInstanceOf(HoodieSchema.Variant.class, parsedSchema);
+    HoodieSchema.Variant parsedVariant = (HoodieSchema.Variant) parsedSchema;
+    assertTrue(parsedVariant.isShredded());
+    assertTrue(parsedVariant.getTypedValueField().isPresent());
+  }
+
+  @Test
+  public void testVariantNullableValueFieldForShredded() {
+    // Shredded variant should have nullable value field
+    HoodieSchema.Variant shreddedVariant = 
HoodieSchema.createVariantShredded(null);
+
+    HoodieSchema valueFieldSchema = shreddedVariant.getValueField();
+    assertTrue(valueFieldSchema.isNullable());
+    assertEquals(HoodieSchemaType.UNION, valueFieldSchema.getType());
+  }
+
+  @Test
+  public void testVariantNonNullableValueFieldForUnshredded() {
+    // Unshredded variant should have non-nullable value field
+    HoodieSchema.Variant unshreddedVariant = HoodieSchema.createVariant();
+
+    HoodieSchema valueFieldSchema = unshreddedVariant.getValueField();
+    assertFalse(valueFieldSchema.isNullable());
+    assertEquals(HoodieSchemaType.BYTES, valueFieldSchema.getType());
+  }
+
+  @Test
+  public void testVariantCustomNameAndNamespace() {
+    // Test creating variant with custom name and namespace
+    String customName = "my_custom_variant";
+    String customNamespace = "org.apache.hudi.test";
+    String customDoc = "Custom variant for testing";
+
+    HoodieSchema.Variant customVariant = 
HoodieSchema.createVariant(customName, customNamespace, customDoc);
+
+    assertEquals(customName, customVariant.getAvroSchema().getName());
+    assertEquals(customNamespace, 
customVariant.getAvroSchema().getNamespace());
+    assertEquals(customDoc, customVariant.getAvroSchema().getDoc());
+
+    // Verify it's still recognized as a Variant
+    
assertTrue(HoodieSchema.Variant.isVariantSchema(customVariant.getAvroSchema()));
+    assertEquals(HoodieSchemaType.VARIANT, customVariant.getType());
+  }
+
+  @Test
+  public void testVariantLogicalTypeSingleton() {
+    // Test that VariantLogicalType is a true singleton
+    HoodieSchema.VariantLogicalType instance1 = 
HoodieSchema.VariantLogicalType.variant();
+    HoodieSchema.VariantLogicalType instance2 = 
HoodieSchema.VariantLogicalType.variant();
+
+    // Verify they are the exact same instance (reference equality)
+    assertSame(instance1, instance2);
+    assertTrue(instance1 == instance2);

Review Comment:
   This is the same as assertSame



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala:
##########
@@ -159,7 +159,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase 
with SparkAdapterSuppor
   }
 
   test("Test Create Expression Index Syntax") {
-    withTempDir { tmp =>
+    withTempDir { tmp =>2

Review Comment:
   This looks accidental



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1343,6 +1480,263 @@ public enum TimePrecision {
     MICROS
   }
 
+  /**
+   * Custom Avro LogicalType for Variant.
+   * This logical type is applied to RECORD schemas that represent Variant 
types.
+   *
+   * <p>This is a singleton type - use {@link #variant()} to get the 
instance.</p>
+   */
+  public static class VariantLogicalType extends LogicalType {
+
+    private static final String VARIANT_LOGICAL_TYPE_NAME = "variant";
+    // Eager initialization of singleton
+    private static final VariantLogicalType INSTANCE = new 
VariantLogicalType();
+
+    private VariantLogicalType() {
+      super(VariantLogicalType.VARIANT_LOGICAL_TYPE_NAME);
+    }
+
+    /**
+     * Returns the singleton instance of VariantLogicalType.
+     *
+     * @return the Variant logical type instance
+     */
+    public static VariantLogicalType variant() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getType() != Schema.Type.RECORD) {
+        throw new IllegalArgumentException("Variant logical type can only be 
applied to RECORD schemas, got: " + schema.getType());
+      }
+    }
+  }
+
+  /**
+   * Factory for creating VariantLogicalType instances.
+   */
+  private static class VariantLogicalTypeFactory implements 
LogicalTypes.LogicalTypeFactory {
+    @Override
+    public LogicalType fromSchema(Schema schema) {
+      return VariantLogicalType.variant();
+    }
+
+    @Override
+    public String getTypeName() {
+      return VariantLogicalType.VARIANT_LOGICAL_TYPE_NAME;
+    }
+  }
+
+  /**
+   * Variant schema type representing semi-structured data that can store 
values of different types.
+   *
+   * <p>According to the Parquet specification, a Variant is represented as a 
record/group with binary fields:
+   * <ul>
+   *   <li>metadata: Binary field containing the Variant metadata component 
(always required)</li>
+   *   <li>value: Binary field containing the Variant value component 
(required for unshredded, optional for shredded)</li>
+   *   <li>typed_value: Optional field for shredded variants, stores values 
matching a specific type (type varies)</li>
+   * </ul>
+   * </p>
+   *
+   * <p>This implementation supports both:</p>
+   * <ul>
+   *   <li><b>Unshredded Variant</b>: metadata (required) and value (required) 
fields.</li>
+   *   <li><b>Shredded Variant</b>: metadata (required), value (optional), and 
typed_value (optional).</li>
+   * </ul>
+   *
+   * <p>Backwards compatibility:</p>
+   * <ul>
+   *   <li>Old Hudi versions will read it as a regular record with byte array 
fields</li>
+   *   <li>New Hudi versions can detect it as a Variant type via the Avro 
LogicalType mechanism</li>
+   * </ul>
+   */
+  public static class Variant extends HoodieSchema {
+
+    private static final String VARIANT_DEFAULT_NAME = "variant";
+    private static final String VARIANT_METADATA_FIELD = "metadata";
+    private static final String VARIANT_VALUE_FIELD = "value";
+    private static final String VARIANT_TYPED_VALUE_FIELD = "typed_value";
+
+    private final boolean isShredded;
+    private final Option<HoodieSchema> typedValueSchema;
+
+    /**
+     * Creates a new Variant HoodieSchema wrapping the given Avro schema.
+     *
+     * @param avroSchema the Avro schema to wrap, must be a valid Variant 
schema
+     * @throws IllegalArgumentException if avroSchema is null or not a valid 
Variant schema
+     */
+    private Variant(Schema avroSchema) {
+      super(avroSchema);
+      this.isShredded = determineIfShredded(avroSchema);
+      this.typedValueSchema = extractTypedValueSchema(avroSchema);
+      validateVariantSchema(avroSchema);
+    }
+
+    /**
+     * Determines if the variant schema is shredded based on the value field 
nullability or presence of typed_value.
+     *
+     * @param avroSchema the schema to check
+     * @return true if the value field is nullable or typed_value exists 
(shredded), false otherwise (unshredded)
+     */
+    private boolean determineIfShredded(Schema avroSchema) {
+      // Check if typed_value field exists
+      Schema.Field typedValueField = 
avroSchema.getField(VARIANT_TYPED_VALUE_FIELD);
+      if (typedValueField != null) {
+        return true;
+      }
+
+      // Check if value field is nullable
+      Schema.Field valueField = avroSchema.getField(VARIANT_VALUE_FIELD);
+      if (valueField == null) {
+        return false;
+      }
+      Schema valueSchema = valueField.schema();
+      if (valueSchema.getType() == Schema.Type.UNION) {
+        return valueSchema.getTypes().stream().anyMatch(s -> s.getType() == 
Schema.Type.NULL);
+      }
+      return false;
+    }
+
+    /**
+     * Extracts the typed_value field schema if present.
+     *
+     * @param avroSchema the schema to extract from
+     * @return Option containing the typed_value schema, or Option.empty() if 
not present
+     */
+    private Option<HoodieSchema> extractTypedValueSchema(Schema avroSchema) {
+      Schema.Field typedValueField = 
avroSchema.getField(VARIANT_TYPED_VALUE_FIELD);
+      if (typedValueField != null) {
+        return 
Option.of(HoodieSchema.fromAvroSchema(typedValueField.schema()));
+      }
+      return Option.empty();
+    }
+
+    /**
+     * Validates that the given Avro schema conforms to the Variant 
specification.
+     *
+     * @param avroSchema the schema to validate
+     * @throws IllegalArgumentException if the schema is not a valid Variant 
schema
+     */
+    private void validateVariantSchema(Schema avroSchema) {
+      if (avroSchema.getType() != Schema.Type.RECORD) {
+        throw new IllegalArgumentException("Variant schema must be a RECORD 
type, got: " + avroSchema.getType());
+      }
+
+      // Check for metadata field (always required)
+      Schema.Field metadataField = avroSchema.getField(VARIANT_METADATA_FIELD);
+      if (metadataField == null) {
+        throw new IllegalArgumentException("Variant schema must have a '" + 
VARIANT_METADATA_FIELD + "' field");
+      }
+      if (metadataField.schema().getType() != Schema.Type.BYTES) {
+        throw new IllegalArgumentException("Variant metadata field must be 
BYTES type, got: " + metadataField.schema().getType());
+      }
+
+      // Check for value field
+      Schema.Field valueField = avroSchema.getField(VARIANT_VALUE_FIELD);
+      if (valueField == null) {
+        throw new IllegalArgumentException("Variant schema must have a '" + 
VARIANT_VALUE_FIELD + "' field");
+      }
+
+      Schema valueSchema = valueField.schema();
+      if (isShredded) {
+        // Shredded: value should be nullable (union with null and bytes)
+        if (valueSchema.getType() == Schema.Type.UNION) {
+          boolean hasNull = valueSchema.getTypes().stream().anyMatch(s -> 
s.getType() == Schema.Type.NULL);
+          boolean hasBytes = valueSchema.getTypes().stream().anyMatch(s -> 
s.getType() == Schema.Type.BYTES);
+          if (!hasNull || !hasBytes) {
+            throw new IllegalArgumentException("Shredded Variant value field 
should be a union of [null, bytes]");
+          }
+        } else if (valueSchema.getType() != Schema.Type.BYTES) {
+          // If not a union, it should at least be bytes (some shredded 
variants may have non-null value)
+          throw new IllegalArgumentException("Shredded Variant value field 
must be BYTES or nullable BYTES, got: " + valueSchema.getType());
+        }
+      } else {
+        // Unshredded: value must be non-nullable bytes
+        if (valueSchema.getType() != Schema.Type.BYTES) {
+          throw new IllegalArgumentException("Unshredded Variant value field 
must be BYTES type, got: " + valueSchema.getType());
+        }
+      }
+    }
+
+    /**
+     * Checks if the given Avro schema is a Variant schema.
+     * This checks for the Variant logical type.
+     *
+     * @param avroSchema the schema to check
+     * @return true if the schema has a Variant logical type
+     */
+    public static boolean isVariantSchema(Schema avroSchema) {
+      if (avroSchema == null || avroSchema.getType() != Schema.Type.RECORD) {
+        return false;
+      }
+      LogicalType logicalType = avroSchema.getLogicalType();
+      return logicalType instanceof VariantLogicalType;
+    }
+
+    /**
+     * Checks if this is a shredded variant (has typed_value field or nullable 
value field).
+     *
+     * @return true if this is a shredded variant, false for unshredded
+     */
+    public boolean isShredded() {
+      return isShredded;
+    }
+
+    /**
+     * Returns the metadata field schema.
+     *
+     * @return HoodieSchema for the metadata field (always BYTES)
+     */
+    public HoodieSchema getMetadataField() {
+      Schema.Field metadataField = 
getAvroSchema().getField(VARIANT_METADATA_FIELD);
+      return HoodieSchema.fromAvroSchema(metadataField.schema());
+    }
+
+    /**
+     * Returns the value field schema.
+     *
+     * @return HoodieSchema for the value field (BYTES for unshredded, 
nullable BYTES for shredded)
+     */
+    public HoodieSchema getValueField() {
+      Schema.Field valueField = getAvroSchema().getField(VARIANT_VALUE_FIELD);
+      return HoodieSchema.fromAvroSchema(valueField.schema());
+    }
+
+    /**
+     * Returns the typed_value field schema if present (shredded variants 
only).
+     *
+     * @return Option containing the typed_value schema, or Option.empty() if 
not present
+     */
+    public Option<HoodieSchema> getTypedValueField() {

Review Comment:
   Should we switch to lombok for the getters/equals/hashcode?



##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java:
##########
@@ -1042,4 +1044,526 @@ public void 
testParseLogicalTypesCreatesCorrectSubclasses() {
     HoodieSchema.Time tmMicros = (HoodieSchema.Time) parsedTimeMicros;
     assertEquals(HoodieSchema.TimePrecision.MICROS, tmMicros.getPrecision());
   }
+
+  // ==================== Variant Type Tests ====================
+
+  @Test
+  public void testCreateUnshreddedVariant() {
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+
+    assertNotNull(variantSchema);
+    assertInstanceOf(HoodieSchema.Variant.class, variantSchema);
+    assertEquals(HoodieSchemaType.VARIANT, variantSchema.getType());
+    assertEquals("variant", variantSchema.getName());
+    assertFalse(variantSchema.isShredded());
+    assertFalse(variantSchema.getTypedValueField().isPresent());
+
+    // Verify fields
+    List<HoodieSchemaField> fields = variantSchema.getFields();
+    assertEquals(2, fields.size());
+    assertEquals("metadata", fields.get(0).name());
+    assertEquals("value", fields.get(1).name());
+
+    // Verify field types
+    assertEquals(HoodieSchemaType.BYTES, fields.get(0).schema().getType());
+    assertEquals(HoodieSchemaType.BYTES, fields.get(1).schema().getType());
+
+    // Value field should be non-nullable for unshredded
+    assertFalse(fields.get(1).schema().isNullable());
+  }
+
+  @Test
+  public void testCreateUnshreddedVariantWithCustomName() {
+    final String name = "my_variant";
+    final String namespace = "org.apache.hoodie";
+    final String doc = "Custom variant schema";
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant(name, 
namespace, doc);
+
+    assertNotNull(variantSchema);
+    assertEquals(name, variantSchema.getAvroSchema().getName());
+    assertEquals(namespace, variantSchema.getAvroSchema().getNamespace());
+    assertEquals(doc, variantSchema.getAvroSchema().getDoc());
+    assertFalse(variantSchema.isShredded());
+  }
+
+  @Test
+  public void testCreateShreddedVariant() {
+    final String recordName = "TypedValue";
+    final String fieldName = "data";
+    HoodieSchema typedValueSchema = HoodieSchema.createRecord(recordName, 
null, null,
+        Collections.singletonList(HoodieSchemaField.of(fieldName, 
HoodieSchema.create(HoodieSchemaType.STRING))));
+
+    HoodieSchema.Variant variantSchema = 
HoodieSchema.createVariantShredded(typedValueSchema);
+
+    assertNotNull(variantSchema);
+    assertInstanceOf(HoodieSchema.Variant.class, variantSchema);
+    assertEquals(HoodieSchemaType.VARIANT, variantSchema.getType());
+    assertTrue(variantSchema.isShredded());
+    assertTrue(variantSchema.getTypedValueField().isPresent());
+
+    // Verify fields
+    List<HoodieSchemaField> fields = variantSchema.getFields();
+    assertEquals(3, fields.size());
+    assertEquals("metadata", fields.get(0).name());
+    assertEquals("value", fields.get(1).name());
+    assertEquals("typed_value", fields.get(2).name());
+
+    // Value field should be nullable for shredded
+    assertTrue(fields.get(1).schema().isNullable());
+
+    // Verify typed_value schema
+    HoodieSchema retrievedTypedValueSchema = 
variantSchema.getTypedValueField().get();
+    assertEquals(HoodieSchemaType.RECORD, retrievedTypedValueSchema.getType());
+  }
+
+  @Test
+  public void testCreateShreddedVariantWithoutTypedValue() {
+    HoodieSchema.Variant variantSchema = 
HoodieSchema.createVariantShredded(null);
+
+    assertNotNull(variantSchema);
+    assertTrue(variantSchema.isShredded());
+    assertFalse(variantSchema.getTypedValueField().isPresent());
+
+    // Verify fields should have metadata and nullable value, but no 
typed_value
+    List<HoodieSchemaField> fields = variantSchema.getFields();
+    assertEquals(2, fields.size());
+    assertEquals("metadata", fields.get(0).name());
+    assertEquals("value", fields.get(1).name());
+
+    // Value field should be nullable even without typed_value
+    assertTrue(fields.get(1).schema().isNullable());
+  }
+
+  @Test
+  public void testVariantLogicalTypeDetection() {
+    HoodieSchema.Variant variantSchema = HoodieSchema.createVariant();
+    Schema avroSchema = variantSchema.getAvroSchema();
+
+    // Verify logical type is set
+    assertNotNull(avroSchema.getLogicalType());
+    assertInstanceOf(HoodieSchema.VariantLogicalType.class, 
avroSchema.getLogicalType());
+    assertEquals("variant", avroSchema.getLogicalType().getName());
+
+    // Verify isVariantSchema detection
+    assertTrue(HoodieSchema.Variant.isVariantSchema(avroSchema));
+  }
+
+  @Test
+  public void testVariantRoundTripSerializationToJson() {
+    // Create unshredded variant
+    HoodieSchema.Variant originalVariant = HoodieSchema.createVariant();
+    String jsonSchema = originalVariant.toString();
+
+    // Parse back from JSON
+    HoodieSchema parsedSchema = HoodieSchema.parse(jsonSchema);
+
+    // Verify it's still detected as Variant
+    assertInstanceOf(HoodieSchema.Variant.class, parsedSchema);
+    HoodieSchema.Variant parsedVariant = (HoodieSchema.Variant) parsedSchema;
+    assertFalse(parsedVariant.isShredded());
+    assertEquals(HoodieSchemaType.VARIANT, parsedVariant.getType());
+
+    // Verify logical type is preserved
+    
assertTrue(HoodieSchema.Variant.isVariantSchema(parsedVariant.getAvroSchema()));
+  }
+
+  @Test
+  public void testVariantShreddedRoundTripSerializationToJson() {
+    // Create shredded variant with typed_value
+    HoodieSchema typedValueSchema = 
HoodieSchema.create(HoodieSchemaType.STRING);
+    HoodieSchema.Variant originalVariant = 
HoodieSchema.createVariantShredded(typedValueSchema);
+    String jsonSchema = originalVariant.toString();

Review Comment:
   Let's have a similar test where the variant is a field within a record to 
better simulate variants as part of the table schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to