the-other-tim-brown commented on code in PR #17581:
URL: https://github.com/apache/hudi/pull/17581#discussion_r2638147894
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java:
##########
@@ -681,4 +681,44 @@ public static String getRecordQualifiedName(String
tableName) {
// Delegate to AvroSchemaUtils
return AvroSchemaUtils.getAvroRecordQualifiedName(tableName);
}
+
+ /**
+ * Resolves a union schema by finding the schema matching the given full
name.
+ * Handles both simple nullable unions (null + non-null) and complex unions
with multiple types.
+ *
+ * <p>This method supports the following union types:
+ * <ul>
+ * <li>Simple nullable unions: {@code ["null", "Type"]} - returns the
non-null type</li>
+ * <li>Complex unions: {@code ["null", "TypeA", "TypeB"]} - returns the
type matching fieldSchemaFullName</li>
+ * <li>Non-union schemas - returns the schema as-is</li>
+ * </ul>
+ *
+ * @param schema the schema to resolve (may or may not be a union)
+ * @param fieldSchemaFullName the full name of the schema to find within the
union
+ * @return the resolved schema
+ * @throws org.apache.hudi.internal.schema.HoodieSchemaException if the
union cannot be resolved or no matching type is found
+ */
+ public static HoodieSchema resolveUnionSchema(HoodieSchema schema, String
fieldSchemaFullName) {
+ if (schema.getType() != HoodieSchemaType.UNION) {
+ return schema;
+ }
+
+ List<HoodieSchema> innerTypes = schema.getTypes();
+ if (innerTypes.size() == 2 && schema.isNullable()) {
+ // this is a basic nullable field so handle it more efficiently
+ return schema.getNonNullType();
+ }
+
+ HoodieSchema nonNullType = innerTypes.stream()
+ .filter(it -> it.getType() != HoodieSchemaType.NULL &&
java.util.Objects.equals(it.getFullName(), fieldSchemaFullName))
Review Comment:
nitpick: let's import Objects instead of using the fully qualified name
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java:
##########
@@ -681,4 +681,44 @@ public static String getRecordQualifiedName(String
tableName) {
// Delegate to AvroSchemaUtils
return AvroSchemaUtils.getAvroRecordQualifiedName(tableName);
}
+
+ /**
+ * Resolves a union schema by finding the schema matching the given full
name.
+ * Handles both simple nullable unions (null + non-null) and complex unions
with multiple types.
+ *
+ * <p>This method supports the following union types:
+ * <ul>
+ * <li>Simple nullable unions: {@code ["null", "Type"]} - returns the
non-null type</li>
+ * <li>Complex unions: {@code ["null", "TypeA", "TypeB"]} - returns the
type matching fieldSchemaFullName</li>
+ * <li>Non-union schemas - returns the schema as-is</li>
+ * </ul>
+ *
+ * @param schema the schema to resolve (may or may not be a union)
+ * @param fieldSchemaFullName the full name of the schema to find within the
union
+ * @return the resolved schema
+ * @throws org.apache.hudi.internal.schema.HoodieSchemaException if the
union cannot be resolved or no matching type is found
+ */
+ public static HoodieSchema resolveUnionSchema(HoodieSchema schema, String
fieldSchemaFullName) {
+ if (schema.getType() != HoodieSchemaType.UNION) {
+ return schema;
+ }
+
+ List<HoodieSchema> innerTypes = schema.getTypes();
+ if (innerTypes.size() == 2 && schema.isNullable()) {
+ // this is a basic nullable field so handle it more efficiently
+ return schema.getNonNullType();
+ }
+
+ HoodieSchema nonNullType = innerTypes.stream()
+ .filter(it -> it.getType() != HoodieSchemaType.NULL &&
java.util.Objects.equals(it.getFullName(), fieldSchemaFullName))
+ .findFirst()
+ .orElse(null);
+
+ if (nonNullType == null) {
+ throw new org.apache.hudi.internal.schema.HoodieSchemaException(
Review Comment:
Similarly here, let's import the class
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java:
##########
@@ -224,8 +225,8 @@ private static int getIntValue(Object obj) {
}
// added this from AvroSerdeUtils in hive latest
- public static int getIntFromSchema(Schema schema, String name) {
- Object obj = schema.getObjectProp(name);
+ public static int getIntFromSchema(HoodieSchema schema, String name) {
+ Object obj = schema.getProp(name);
Review Comment:
This is not equivalent to the Avro schema's `getObjectProp`. We need to
update the implementation to call `getObjectProp` or else you will get a null
response for non-string props
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java:
##########
@@ -259,22 +260,22 @@ private static TypeInfo generateTypeInfoWorker(Schema
schema,
}
}
- private static TypeInfo generateRecordTypeInfo(Schema schema,
- Set<Schema> seenSchemas)
throws AvroSerdeException {
- assert schema.getType().equals(Schema.Type.RECORD);
+ private static TypeInfo generateRecordTypeInfo(HoodieSchema schema,
+ Set<HoodieSchema>
seenSchemas) throws AvroSerdeException {
+ ValidationUtils.checkArgument(schema.getType() == RECORD, () -> schema + "
is not a RECORD");
if (seenSchemas == null) {
- seenSchemas = Collections.newSetFromMap(new IdentityHashMap<Schema,
Boolean>());
+ seenSchemas = Collections.newSetFromMap(new IdentityHashMap<>());
Review Comment:
I'm wondering if there is any reason this can't be a simple hash set
##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java:
##########
@@ -1539,4 +1540,202 @@ public void
testConvertValueForSpecificDataTypes_UnionWithNull() {
assertTrue(result instanceof LocalDate);
assertEquals(LocalDate.of(2023, 1, 1), result);
}
+
+ @Test
+ void testResolveUnionSchemaWithNonUnionSchema() {
+ // Non-union schemas should be returned as-is
+ HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(stringSchema,
"any");
+
+ assertSame(stringSchema, result);
+ }
+
+ @Test
+ void testResolveUnionSchemaWithSimpleNullableUnion() {
+ // Simple nullable union: ["null", "string"] should return the non-null
type efficiently
+ HoodieSchema nullableString =
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullableString,
"string");
+
+ assertEquals(HoodieSchemaType.STRING, result.getType());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithSimpleNullableRecord() {
+ // Test with nullable record type
+ HoodieSchema personSchema = HoodieSchema.createRecord(
+ "Person",
+ null,
+ null,
+ Collections.singletonList(
+ HoodieSchemaField.of("name",
HoodieSchema.create(HoodieSchemaType.STRING))
+ )
+ );
+
+ HoodieSchema nullablePerson = HoodieSchema.createNullable(personSchema);
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullablePerson,
"Person");
+
+ assertEquals(HoodieSchemaType.RECORD, result.getType());
+ assertEquals("Person", result.getName());
+ assertFalse(result.isNullable());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithComplexUnionMatchingFullName() {
+ // Complex union with 3+ types, matching by fullName
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"PersonRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"CompanyRecord\",\"fields\":[{\"name\":\"companyName\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Resolve to PersonRecord
+ HoodieSchema personResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "PersonRecord");
+ assertEquals(HoodieSchemaType.RECORD, personResult.getType());
+ assertEquals("PersonRecord", personResult.getName());
+ assertFalse(personResult.isNullable());
+ assertTrue(personResult.getField("name").isPresent());
+
+ // Resolve to CompanyRecord
+ HoodieSchema companyResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "CompanyRecord");
+ assertEquals(HoodieSchemaType.RECORD, companyResult.getType());
+ assertEquals("CompanyRecord", companyResult.getName());
+ assertFalse(companyResult.isNullable());
+ assertTrue(companyResult.getField("companyName").isPresent());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithNonNullableTwoTypeUnion() {
+ // Union of two non-nullable types should use the complex resolution path
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":["
+ + "
{\"type\":\"record\",\"name\":\"TypeA\",\"fields\":[{\"name\":\"fieldA\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"TypeB\",\"fields\":[{\"name\":\"fieldB\",\"type\":\"int\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Resolve to TypeA
+ HoodieSchema typeAResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "TypeA");
+ assertEquals(HoodieSchemaType.RECORD, typeAResult.getType());
+ assertEquals("TypeA", typeAResult.getName());
+ assertFalse(typeAResult.isNullable());
+
+ // Resolve to TypeB
+ HoodieSchema typeBResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "TypeB");
+ assertEquals(HoodieSchemaType.RECORD, typeBResult.getType());
+ assertEquals("TypeB", typeBResult.getName());
+ assertFalse(typeAResult.isNullable());
+ }
+
+ @Test
+ void testResolveUnionSchemaThrowsExceptionWhenNoMatch() {
+ // Complex union where the requested fullName doesn't match any type
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"PersonRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"CompanyRecord\",\"fields\":[{\"name\":\"companyName\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Try to resolve to a type that doesn't exist in the union
+ org.apache.hudi.internal.schema.HoodieSchemaException exception =
assertThrows(
Review Comment:
Similarly let's import the class here
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java:
##########
@@ -268,28 +267,34 @@ public GenericRecord serialize(Object o, Schema schema) {
return record;
}
- private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object
structFieldData, ObjectInspector fieldOI, GenericData.Record record,
Schema.Field field) {
+ private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object
structFieldData, ObjectInspector fieldOI, GenericData.Record record,
HoodieSchemaField field) {
Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
if (val == null) {
- if (field.defaultVal() instanceof JsonProperties.Null) {
+ Option<Object> defaultValOpt = field.defaultVal();
+ // In Avro/HoodieSchema, field.defaultVal() returns:
+ // - JsonProperties.Null / HoodieSchema.NULL_VALUE = if default is
explicitly null
+ // - null / isEmpty() = if field has NO default value
+ // - some value = if field has an actual default
+ if (defaultValOpt.isPresent() && defaultValOpt.get() ==
HoodieSchema.NULL_VALUE) {
Review Comment:
could we simplify this to something like `Object recordValue =
field.defaultVal().map(val -> val == HoodieSchema.NULL_VALUE ? null :
val).orElse(null)` and then we don't need the if/else?
##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java:
##########
@@ -1539,4 +1540,202 @@ public void
testConvertValueForSpecificDataTypes_UnionWithNull() {
assertTrue(result instanceof LocalDate);
assertEquals(LocalDate.of(2023, 1, 1), result);
}
+
+ @Test
+ void testResolveUnionSchemaWithNonUnionSchema() {
+ // Non-union schemas should be returned as-is
+ HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(stringSchema,
"any");
+
+ assertSame(stringSchema, result);
+ }
+
+ @Test
+ void testResolveUnionSchemaWithSimpleNullableUnion() {
+ // Simple nullable union: ["null", "string"] should return the non-null
type efficiently
+ HoodieSchema nullableString =
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullableString,
"string");
+
+ assertEquals(HoodieSchemaType.STRING, result.getType());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithSimpleNullableRecord() {
+ // Test with nullable record type
+ HoodieSchema personSchema = HoodieSchema.createRecord(
+ "Person",
+ null,
+ null,
+ Collections.singletonList(
+ HoodieSchemaField.of("name",
HoodieSchema.create(HoodieSchemaType.STRING))
+ )
+ );
+
+ HoodieSchema nullablePerson = HoodieSchema.createNullable(personSchema);
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullablePerson,
"Person");
+
+ assertEquals(HoodieSchemaType.RECORD, result.getType());
+ assertEquals("Person", result.getName());
+ assertFalse(result.isNullable());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithComplexUnionMatchingFullName() {
+ // Complex union with 3+ types, matching by fullName
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"PersonRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"CompanyRecord\",\"fields\":[{\"name\":\"companyName\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Resolve to PersonRecord
+ HoodieSchema personResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "PersonRecord");
+ assertEquals(HoodieSchemaType.RECORD, personResult.getType());
+ assertEquals("PersonRecord", personResult.getName());
+ assertFalse(personResult.isNullable());
+ assertTrue(personResult.getField("name").isPresent());
+
+ // Resolve to CompanyRecord
+ HoodieSchema companyResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "CompanyRecord");
+ assertEquals(HoodieSchemaType.RECORD, companyResult.getType());
+ assertEquals("CompanyRecord", companyResult.getName());
+ assertFalse(companyResult.isNullable());
+ assertTrue(companyResult.getField("companyName").isPresent());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithNonNullableTwoTypeUnion() {
+ // Union of two non-nullable types should use the complex resolution path
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":["
+ + "
{\"type\":\"record\",\"name\":\"TypeA\",\"fields\":[{\"name\":\"fieldA\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"TypeB\",\"fields\":[{\"name\":\"fieldB\",\"type\":\"int\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Resolve to TypeA
+ HoodieSchema typeAResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "TypeA");
+ assertEquals(HoodieSchemaType.RECORD, typeAResult.getType());
+ assertEquals("TypeA", typeAResult.getName());
+ assertFalse(typeAResult.isNullable());
+
+ // Resolve to TypeB
+ HoodieSchema typeBResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "TypeB");
+ assertEquals(HoodieSchemaType.RECORD, typeBResult.getType());
+ assertEquals("TypeB", typeBResult.getName());
+ assertFalse(typeAResult.isNullable());
+ }
+
+ @Test
+ void testResolveUnionSchemaThrowsExceptionWhenNoMatch() {
+ // Complex union where the requested fullName doesn't match any type
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"PersonRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"CompanyRecord\",\"fields\":[{\"name\":\"companyName\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Try to resolve to a type that doesn't exist in the union
+ org.apache.hudi.internal.schema.HoodieSchemaException exception =
assertThrows(
+ org.apache.hudi.internal.schema.HoodieSchemaException.class,
+ () -> HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema,
"AnimalRecord")
+ );
+
+ assertTrue(exception.getMessage().contains("Unsupported UNION type"));
+ assertTrue(exception.getMessage().contains("Only UNION of a null type and
a non-null type is supported"));
+ }
+
+ @Test
+ void testResolveUnionSchemaWithNamespacedRecords() {
+ // Test with fully qualified names (with namespace)
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"namespace\":\"com.example\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"com.example.model\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"Company\",\"namespace\":\"com.example.model\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Resolve using fully qualified name
+ HoodieSchema personResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema,
"com.example.model.Person");
+ assertEquals(HoodieSchemaType.RECORD, personResult.getType());
+ assertEquals("Person", personResult.getName());
+ assertFalse(personResult.isNullable());
+ assertEquals("com.example.model", personResult.getNamespace().get());
+
+ // Resolve Company
+ HoodieSchema companyResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema,
"com.example.model.Company");
+ assertEquals(HoodieSchemaType.RECORD, companyResult.getType());
+ assertEquals("Company", companyResult.getName());
+ assertFalse(companyResult.isNullable());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithPrimitiveTypes() {
+ // Test union containing primitive types (although less common)
+ // Union of null and string, but passed through the full name matching path
+ HoodieSchema nullableString =
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+
+ // For simple 2-element nullable union, should use fast path
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullableString,
"string");
+ assertEquals(HoodieSchemaType.STRING, result.getType());
+ }
+
+ @Test
+ void testResolveUnionSchemaConsistencyWithOriginalAvroImpl() {
+ // Verify that HoodieSchemaUtils.resolveUnionSchema produces equivalent
results to the original AvroSchemaUtils.resolveUnionSchema
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"TestRecord\","
+ + "\"fields\":[{"
+ + " \"name\":\"unionField\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"TypeA\",\"fields\":[{\"name\":\"a\",\"type\":\"int\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"TypeB\",\"fields\":[{\"name\":\"b\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ Schema avroSchema = new Schema.Parser().parse(unionSchemaJson);
+ HoodieSchema hoodieSchema = HoodieSchema.parse(unionSchemaJson);
+
+ Schema avroFieldSchema = avroSchema.getField("unionField").schema();
+ HoodieSchema hoodieFieldSchema =
hoodieSchema.getField("unionField").get().schema();
+
+ // Resolve using both implementations
+ Schema avroResult = AvroSchemaUtils.resolveUnionSchema(avroFieldSchema,
"TypeA");
+ HoodieSchema hoodieResult =
HoodieSchemaUtils.resolveUnionSchema(hoodieFieldSchema, "TypeA");
+
+ // Should produce equivalent schemas
+ assertEquals(avroResult.toString(), hoodieResult.toString());
Review Comment:
Can we assert on the objects being equal instead of the string
representation by calling `.toAvroSchema`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]