This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 91c15b10d5d4 feat(schema): phase 17 - Remove AvroSchemaUtils usage
(part 2) (#17581)
91c15b10d5d4 is described below
commit 91c15b10d5d468b4756385a4d4db678b286373ba
Author: voonhous <[email protected]>
AuthorDate: Thu Dec 25 02:18:09 2025 +0800
feat(schema): phase 17 - Remove AvroSchemaUtils usage (part 2) (#17581)
* Remove AvroSchemaUtils from HiveAvroSerializer and HiveTypeUtils
* Address comments
* Address comments
* Remove AvroSchemaUtils#resolveUnionSchema
* Add more test to cover generateColumnTypes
* Fix HiveTypeUtils#generateTypeInfo behaviour and ensure that it is not
changed
* Address comments
* Increase Azure CI timeout
---
azure-pipelines-20230430.yml | 4 +-
.../hadoop/TestHoodieFileGroupReaderOnHive.java | 2 +-
.../org/apache/hudi/avro/AvroRecordContext.java | 16 +-
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 33 ---
.../hudi/common/schema/HoodieSchemaUtils.java | 42 ++++
.../hudi/common/schema/TestHoodieSchemaUtils.java | 171 ++++++++++++++
.../hudi/hadoop/HiveHoodieReaderContext.java | 2 +-
.../org/apache/hudi/hadoop/HiveRecordContext.java | 11 +-
.../org/apache/hudi/hadoop/HoodieHiveRecord.java | 11 +-
.../realtime/RealtimeCompactedRecordReader.java | 4 +-
.../hudi/hadoop/utils/HiveAvroSerializer.java | 126 +++++-----
.../apache/hudi/hadoop/utils/HiveTypeUtils.java | 189 ++++++++-------
.../apache/hudi/hadoop/TestHoodieHiveRecord.java | 9 +-
.../hudi/hadoop/utils/TestHiveAvroSerializer.java | 253 ++++++++++++++++++---
.../utils/TestHoodieArrayWritableSchemaUtils.java | 8 +-
15 files changed, 642 insertions(+), 239 deletions(-)
diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 0afe13c64bb6..58fcbc22677d 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -180,7 +180,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_2
displayName: FTA hudi-spark
- timeoutInMinutes: '90'
+ timeoutInMinutes: '180'
steps:
- task: Maven@4
displayName: maven install
@@ -492,7 +492,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_10
displayName: UT FT common & other modules
- timeoutInMinutes: '90'
+ timeoutInMinutes: '180'
steps:
- task: Docker@2
displayName: "login to docker hub"
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index 05d2c898f868..47b4114dea06 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -121,7 +121,7 @@ public class TestHoodieFileGroupReaderOnHive extends
HoodieFileGroupReaderOnJava
List<HoodieSchemaField> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION);
try {
- String columnTypes =
HiveTypeUtils.generateColumnTypes(schema.toAvroSchema()).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
+ String columnTypes =
HiveTypeUtils.generateColumnTypes(schema).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
jobConf.set("columns.types", columnTypes + ",string");
} catch (SerDeException e) {
throw new RuntimeException(e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
index 9fe1ea3f0650..947a292196ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.AvroJavaTypeConverter;
@@ -32,7 +33,6 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -70,22 +70,20 @@ public class AvroRecordContext extends
RecordContext<IndexedRecord> {
public static Object getFieldValueFromIndexedRecord(
IndexedRecord record,
String fieldName) {
- Schema currentSchema = record.getSchema();
+ HoodieSchema currentSchema =
HoodieSchema.fromAvroSchema(record.getSchema());
IndexedRecord currentRecord = record;
String[] path = fieldName.split("\\.");
for (int i = 0; i < path.length; i++) {
- if (currentSchema.isUnion()) {
- currentSchema = AvroSchemaUtils.getNonNullTypeFromUnion(currentSchema);
- }
- Schema.Field field = currentSchema.getField(path[i]);
- if (field == null) {
+ currentSchema = currentSchema.getNonNullType();
+ Option<HoodieSchemaField> fieldOpt = currentSchema.getField(path[i]);
+ if (fieldOpt.isEmpty()) {
return null;
}
- Object value = currentRecord.get(field.pos());
+ Object value = currentRecord.get(fieldOpt.get().pos());
if (i == path.length - 1) {
return value;
}
- currentSchema = field.schema();
+ currentSchema = fieldOpt.get().schema();
currentRecord = (IndexedRecord) value;
}
return null;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index d9dd9869c69b..26fbd96b53df 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -477,39 +477,6 @@ public class AvroSchemaUtils {
}
}
- /**
- * Passed in {@code Union} schema and will try to resolve the field with the
{@code fieldSchemaFullName}
- * w/in the union returning its corresponding schema
- *
- * @param schema target schema to be inspected
- * @param fieldSchemaFullName target field-name to be looked up w/in the
union
- * @return schema of the field w/in the union identified by the {@code
fieldSchemaFullName}
- */
- public static Schema resolveUnionSchema(Schema schema, String
fieldSchemaFullName) {
- if (schema.getType() != Schema.Type.UNION) {
- return schema;
- }
-
- List<Schema> innerTypes = schema.getTypes();
- if (innerTypes.size() == 2 && isNullable(schema)) {
- // this is a basic nullable field so handle it more efficiently
- return getNonNullTypeFromUnion(schema);
- }
-
- Schema nonNullType =
- innerTypes.stream()
- .filter(it -> it.getType() != Schema.Type.NULL &&
Objects.equals(it.getFullName(), fieldSchemaFullName))
- .findFirst()
- .orElse(null);
-
- if (nonNullType == null) {
- throw new HoodieAvroSchemaException(
- String.format("Unsupported Avro UNION type %s: Only UNION of a null
type and a non-null type is supported", schema));
- }
-
- return nonNullType;
- }
-
/**
* Returns true in case provided {@link Schema} is nullable (ie accepting
null values),
* returns false otherwise
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index cd0be22fd52e..bb54628e31af 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
@@ -35,6 +36,7 @@ import java.math.RoundingMode;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -724,4 +726,44 @@ public final class HoodieSchemaUtils {
private static boolean isSmallPrecisionDecimalField(HoodieSchema.Decimal
decimal) {
return decimal.getPrecision() <= 18;
}
+
+ /**
+ * Resolves a union schema by finding the schema matching the given full
name.
+ * Handles both simple nullable unions (null + non-null) and complex unions
with multiple types.
+ *
+ * <p>This method supports the following union types:
+ * <ul>
+ * <li>Simple nullable unions: {@code ["null", "Type"]} - returns the
non-null type</li>
+ * <li>Complex unions: {@code ["null", "TypeA", "TypeB"]} - returns the
type matching fieldSchemaFullName</li>
+ * <li>Non-union schemas - returns the schema as-is</li>
+ * </ul>
+ *
+ * @param schema the schema to resolve (may or may not be a union)
+ * @param fieldSchemaFullName the full name of the schema to find within the
union
+ * @return the resolved schema
+ * @throws HoodieSchemaException if the union cannot be resolved or no
matching type is found
+ */
+ public static HoodieSchema resolveUnionSchema(HoodieSchema schema, String
fieldSchemaFullName) {
+ if (schema.getType() != HoodieSchemaType.UNION) {
+ return schema;
+ }
+
+ List<HoodieSchema> innerTypes = schema.getTypes();
+ if (innerTypes.size() == 2 && schema.isNullable()) {
+ // this is a basic nullable field so handle it more efficiently
+ return schema.getNonNullType();
+ }
+
+ HoodieSchema nonNullType = innerTypes.stream()
+ .filter(it -> it.getType() != HoodieSchemaType.NULL &&
Objects.equals(it.getFullName(), fieldSchemaFullName))
+ .findFirst()
+ .orElse(null);
+
+ if (nonNullType == null) {
+ throw new HoodieSchemaException(
+ String.format("Unsupported UNION type %s: Only UNION of a null type
and a non-null type is supported", schema));
+ }
+
+ return nonNullType;
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
index e29ee000f766..b5f649851fad 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
@@ -23,6 +23,7 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
@@ -1588,4 +1589,174 @@ public class TestHoodieSchemaUtils {
assertFalse(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(SCHEMA_WITH_AVRO_TYPES_STR)));
assertFalse(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(EXAMPLE_SCHEMA)));
}
+
+ @Test
+ void testResolveUnionSchemaWithNonUnionSchema() {
+ // Non-union schemas should be returned as-is
+ HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(stringSchema,
"any");
+
+ assertSame(stringSchema, result);
+ }
+
+ @Test
+ void testResolveUnionSchemaWithSimpleNullableUnion() {
+ // Simple nullable union: ["null", "string"] should return the non-null
type efficiently
+ HoodieSchema nullableString =
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullableString,
"string");
+
+ assertEquals(HoodieSchemaType.STRING, result.getType());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithSimpleNullableRecord() {
+ // Test with nullable record type
+ HoodieSchema personSchema = HoodieSchema.createRecord(
+ "Person",
+ null,
+ null,
+ Collections.singletonList(
+ HoodieSchemaField.of("name",
HoodieSchema.create(HoodieSchemaType.STRING))
+ )
+ );
+
+ HoodieSchema nullablePerson = HoodieSchema.createNullable(personSchema);
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullablePerson,
"Person");
+
+ assertEquals(HoodieSchemaType.RECORD, result.getType());
+ assertEquals("Person", result.getName());
+ assertFalse(result.isNullable());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithComplexUnionMatchingFullName() {
+ // Complex union with 3+ types, matching by fullName
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"PersonRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"CompanyRecord\",\"fields\":[{\"name\":\"companyName\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Resolve to PersonRecord
+ HoodieSchema personResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "PersonRecord");
+ assertEquals(HoodieSchemaType.RECORD, personResult.getType());
+ assertEquals("PersonRecord", personResult.getName());
+ assertFalse(personResult.isNullable());
+ assertTrue(personResult.getField("name").isPresent());
+
+ // Resolve to CompanyRecord
+ HoodieSchema companyResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "CompanyRecord");
+ assertEquals(HoodieSchemaType.RECORD, companyResult.getType());
+ assertEquals("CompanyRecord", companyResult.getName());
+ assertFalse(companyResult.isNullable());
+ assertTrue(companyResult.getField("companyName").isPresent());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithNonNullableTwoTypeUnion() {
+ // Union of two non-nullable types should use the complex resolution path
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":["
+ + "
{\"type\":\"record\",\"name\":\"TypeA\",\"fields\":[{\"name\":\"fieldA\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"TypeB\",\"fields\":[{\"name\":\"fieldB\",\"type\":\"int\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Resolve to TypeA
+ HoodieSchema typeAResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "TypeA");
+ assertEquals(HoodieSchemaType.RECORD, typeAResult.getType());
+ assertEquals("TypeA", typeAResult.getName());
+ assertFalse(typeAResult.isNullable());
+
+ // Resolve to TypeB
+ HoodieSchema typeBResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "TypeB");
+ assertEquals(HoodieSchemaType.RECORD, typeBResult.getType());
+ assertEquals("TypeB", typeBResult.getName());
+ assertFalse(typeAResult.isNullable());
+ }
+
+ @Test
+ void testResolveUnionSchemaThrowsExceptionWhenNoMatch() {
+ // Complex union where the requested fullName doesn't match any type
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"PersonRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"CompanyRecord\",\"fields\":[{\"name\":\"companyName\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Try to resolve to a type that doesn't exist in the union
+ HoodieSchemaException exception = assertThrows(
+ HoodieSchemaException.class,
+ () -> HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema,
"AnimalRecord")
+ );
+
+ assertTrue(exception.getMessage().contains("Unsupported UNION type"));
+ assertTrue(exception.getMessage().contains("Only UNION of a null type and
a non-null type is supported"));
+ }
+
+ @Test
+ void testResolveUnionSchemaWithNamespacedRecords() {
+ // Test with fully qualified names (with namespace)
+ String unionSchemaJson = "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"Container\","
+ + "\"namespace\":\"com.example\","
+ + "\"fields\":[{"
+ + " \"name\":\"data\","
+ + " \"type\":[\"null\","
+ + "
{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"com.example.model\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+ + "
{\"type\":\"record\",\"name\":\"Company\",\"namespace\":\"com.example.model\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"
+ + " ]"
+ + "}]}";
+
+ HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+ HoodieSchema dataFieldSchema =
containerSchema.getField("data").get().schema();
+
+ // Resolve using fully qualified name
+ HoodieSchema personResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema,
"com.example.model.Person");
+ assertEquals(HoodieSchemaType.RECORD, personResult.getType());
+ assertEquals("Person", personResult.getName());
+ assertFalse(personResult.isNullable());
+ assertEquals("com.example.model", personResult.getNamespace().get());
+
+ // Resolve Company
+ HoodieSchema companyResult =
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema,
"com.example.model.Company");
+ assertEquals(HoodieSchemaType.RECORD, companyResult.getType());
+ assertEquals("Company", companyResult.getName());
+ assertFalse(companyResult.isNullable());
+ }
+
+ @Test
+ void testResolveUnionSchemaWithPrimitiveTypes() {
+ // Test union containing primitive types (although less common)
+ // Union of null and string, but passed through the full name matching path
+ HoodieSchema nullableString =
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+
+ // For simple 2-element nullable union, should use fast path
+ HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullableString,
"string");
+ assertEquals(HoodieSchemaType.STRING, result.getType());
+ }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 4614854d4df3..38cd64c13f37 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -101,7 +101,7 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",",
dataColumnNameList));
List<TypeInfo> columnTypes;
try {
- columnTypes =
HiveTypeUtils.generateColumnTypes(dataSchema.toAvroSchema());
+ columnTypes = HiveTypeUtils.generateColumnTypes(dataSchema);
} catch (AvroSerdeException e) {
throw new HoodieAvroSchemaException(String.format("Failed to generate
hive column types from schema: %s, due to %s", dataSchema, e));
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
index 8f52b290354b..bc12027921f6 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
@@ -31,7 +31,6 @@ import org.apache.hudi.hadoop.utils.HiveJavaTypeConverter;
import org.apache.hudi.hadoop.utils.HoodieArrayWritableSchemaUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -56,9 +55,9 @@ public class HiveRecordContext extends
RecordContext<ArrayWritable> {
return FIELD_ACCESSOR_INSTANCE;
}
- private final Map<Schema, HiveAvroSerializer> serializerCache = new
ConcurrentHashMap<>();
+ private final Map<HoodieSchema, HiveAvroSerializer> serializerCache = new
ConcurrentHashMap<>();
- private HiveAvroSerializer getHiveAvroSerializer(Schema schema) {
+ private HiveAvroSerializer getHiveAvroSerializer(HoodieSchema schema) {
return serializerCache.computeIfAbsent(schema, HiveAvroSerializer::new);
}
@@ -72,7 +71,7 @@ public class HiveRecordContext extends
RecordContext<ArrayWritable> {
@Override
public Object getValue(ArrayWritable record, HoodieSchema schema, String
fieldName) {
- return getHiveAvroSerializer(schema.toAvroSchema()).getValue(record,
fieldName);
+ return getHiveAvroSerializer(schema).getValue(record, fieldName);
}
@Override
@@ -92,7 +91,7 @@ public class HiveRecordContext extends
RecordContext<ArrayWritable> {
}
HoodieSchema schema = getSchemaFromBufferRecord(bufferedRecord);
ArrayWritable writable = bufferedRecord.getRecord();
- return new HoodieHiveRecord(key, writable, schema.toAvroSchema(),
getHiveAvroSerializer(schema.toAvroSchema()),
+ return new HoodieHiveRecord(key, writable, schema,
getHiveAvroSerializer(schema),
bufferedRecord.getHoodieOperation(),
bufferedRecord.getOrderingValue(), bufferedRecord.isDelete());
}
@@ -143,7 +142,7 @@ public class HiveRecordContext extends
RecordContext<ArrayWritable> {
@Override
public GenericRecord convertToAvroRecord(ArrayWritable record, HoodieSchema
schema) {
- return getHiveAvroSerializer(schema.toAvroSchema()).serialize(record);
+ return getHiveAvroSerializer(schema).serialize(record);
}
@Override
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index 1ee19c32cecf..a3f5f857dc57 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.OrderingValues;
@@ -60,9 +61,9 @@ public class HoodieHiveRecord extends
HoodieRecord<ArrayWritable> {
private final HiveAvroSerializer avroSerializer;
- protected Schema schema;
+ protected HoodieSchema schema;
- public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema,
HiveAvroSerializer avroSerializer) {
+ public HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema
schema, HiveAvroSerializer avroSerializer) {
super(key, data);
this.avroSerializer = avroSerializer;
this.schema = schema;
@@ -70,7 +71,7 @@ public class HoodieHiveRecord extends
HoodieRecord<ArrayWritable> {
isDelete = data == null;
}
- public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema,
HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation, Comparable
orderingValue, boolean isDelete) {
+ public HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema
schema, HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation,
Comparable orderingValue, boolean isDelete) {
super(key, data, hoodieOperation, isDelete, Option.empty());
this.orderingValue = orderingValue;
this.avroSerializer = avroSerializer;
@@ -78,7 +79,7 @@ public class HoodieHiveRecord extends
HoodieRecord<ArrayWritable> {
this.copy = false;
}
- private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema,
HoodieOperation operation, boolean isCopy,
+ private HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema
schema, HoodieOperation operation, boolean isCopy,
HiveAvroSerializer avroSerializer) {
super(key, data, operation, Option.empty());
this.schema = schema;
@@ -246,7 +247,7 @@ public class HoodieHiveRecord extends
HoodieRecord<ArrayWritable> {
return avroSerializer.getValue(data, name);
}
- protected Schema getSchema() {
+ protected HoodieSchema getSchema() {
return schema;
}
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index fe0c91f6f3a1..d910145bd464 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -205,7 +205,7 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
// for presto engine, the hiveSchema will be: col1,col2, but the
writerSchema will be col1,col2,par
// so to be compatible with hive and presto, we should rewrite oldRecord
before we call combineAndGetUpdateValue,
// once presto on hudi have its own mor reader, we can remove the rewrite
logical.
- GenericRecord genericRecord =
HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord,
getLogScannerReaderSchema().toAvroSchema());
+ GenericRecord genericRecord =
HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord,
getLogScannerReaderSchema());
RecordContext<IndexedRecord> recordContext =
AvroRecordContext.getFieldAccessorInstance();
BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord,
HoodieSchema.fromAvroSchema(genericRecord.getSchema()), recordContext,
orderingFields, newRecord.getRecordKey(), false);
BufferedRecord newBufferedRecord =
BufferedRecords.fromHoodieRecord(newRecord,
HoodieSchema.fromAvroSchema(getLogScannerReaderSchema().toAvroSchema()),
@@ -218,7 +218,7 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
}
private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable
arrayWritable) {
- GenericRecord record = serializer.serialize(arrayWritable,
getHiveSchema().toAvroSchema());
+ GenericRecord record = serializer.serialize(arrayWritable,
getHiveSchema());
return record;
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
index 16690ac360b5..c0444a609283 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
@@ -18,14 +18,15 @@
package org.apache.hudi.hadoop.utils;
-import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.exception.HoodieException;
-import org.apache.avro.JsonProperties;
-import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
@@ -75,17 +76,16 @@ public class HiveAvroSerializer {
private final List<String> columnNames;
private final List<TypeInfo> columnTypes;
private final ArrayWritableObjectInspector objectInspector;
- private final Schema recordSchema;
+ private final HoodieSchema recordSchema;
private static final Logger LOG =
LoggerFactory.getLogger(HiveAvroSerializer.class);
- public HiveAvroSerializer(Schema schema) {
- schema = AvroSchemaUtils.getNonNullTypeFromUnion(schema);
- if (schema.getType() != Schema.Type.RECORD) {
+ public HiveAvroSerializer(HoodieSchema schema) {
+ if (schema.getNonNullType().getType() != HoodieSchemaType.RECORD) {
throw new IllegalArgumentException("Expected record schema, but got: " +
schema);
}
this.recordSchema = schema;
- this.columnNames =
schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
+ this.columnNames =
schema.getFields().stream().map(HoodieSchemaField::name).map(String::toLowerCase).collect(Collectors.toList());
try {
this.columnTypes = HiveTypeUtils.generateColumnTypes(schema);
} catch (AvroSerdeException e) {
@@ -171,7 +171,7 @@ public class HiveAvroSerializer {
+ "', but got " + context.typeInfo.getTypeName());
}
- if (!(context.schema.getType() == Schema.Type.RECORD)) {
+ if (!(context.schema.getType() == HoodieSchemaType.RECORD)) {
throw new HoodieException("Expected RecordSchema while resolving '" +
path[i]
+ "', but got " + context.schema.getType());
}
@@ -184,15 +184,13 @@ public class HiveAvroSerializer {
}
private FieldContext extractFieldFromRecord(ArrayWritable record,
StructObjectInspector structObjectInspector,
- List<TypeInfo> fieldTypes,
Schema schema, String fieldName) {
- Schema.Field schemaField = schema.getField(fieldName);
- if (schemaField == null) {
- throw new HoodieException("Field '" + fieldName + "' not found in
schema: " + schema);
- }
+ List<TypeInfo> fieldTypes,
HoodieSchema schema, String fieldName) {
+ HoodieSchemaField schemaField = schema.getField(fieldName)
+ .orElseThrow(() -> new HoodieException("Field '" + fieldName + "' not
found in schema: " + schema));
int fieldIdx = schemaField.pos();
TypeInfo fieldTypeInfo = fieldTypes.get(fieldIdx);
- Schema fieldSchema =
AvroSchemaUtils.getNonNullTypeFromUnion(schemaField.schema());
+ HoodieSchema fieldSchema = schemaField.schema().getNonNullType();
StructField structField =
structObjectInspector.getStructFieldRef(fieldName);
if (structField == null) {
@@ -216,9 +214,9 @@ public class HiveAvroSerializer {
final TypeInfo typeInfo;
final ObjectInspector objectInspector;
final Object object;
- final Schema schema;
+ final HoodieSchema schema;
- FieldContext(Object object, ObjectInspector objectInspector, TypeInfo
typeInfo, Schema schema) {
+ FieldContext(Object object, ObjectInspector objectInspector, TypeInfo
typeInfo, HoodieSchema schema) {
this.object = object;
this.objectInspector = objectInspector;
this.typeInfo = typeInfo;
@@ -226,7 +224,7 @@ public class HiveAvroSerializer {
}
}
- private static final Schema STRING_SCHEMA =
Schema.create(Schema.Type.STRING);
+ private static final HoodieSchema STRING_SCHEMA =
HoodieSchema.create(HoodieSchemaType.STRING);
public GenericRecord serialize(Object o) {
if (recordSchema == null) {
@@ -235,10 +233,10 @@ public class HiveAvroSerializer {
return serialize(o, recordSchema);
}
- public GenericRecord serialize(Object o, Schema schema) {
+ public GenericRecord serialize(Object o, HoodieSchema schema) {
StructObjectInspector soi = objectInspector;
- GenericData.Record record = new GenericData.Record(schema);
+ GenericData.Record record = new GenericData.Record(schema.toAvroSchema());
List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
if (outputFieldRefs.size() != columnNames.size()) {
@@ -251,7 +249,7 @@ public class HiveAvroSerializer {
List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
for (int i = 0; i < size; i++) {
- Schema.Field field = schema.getFields().get(i);
+ HoodieSchemaField field = schema.getFields().get(i);
if (i >= columnTypes.size()) {
break;
}
@@ -268,28 +266,27 @@ public class HiveAvroSerializer {
return record;
}
- private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object
structFieldData, ObjectInspector fieldOI, GenericData.Record record,
Schema.Field field) {
+ private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object
structFieldData, ObjectInspector fieldOI, GenericData.Record record,
HoodieSchemaField field) {
+ // In Avro/HoodieSchema, field.defaultVal() returns:
+ // - JsonProperties.Null / HoodieSchema.NULL_VALUE = if default is
explicitly null
+ // - null / isEmpty() = if field has NO default value
+ // - some value = if field has an actual default
Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
- if (val == null) {
- if (field.defaultVal() instanceof JsonProperties.Null) {
- record.put(field.name(), null);
- } else {
- record.put(field.name(), field.defaultVal());
- }
- } else {
- record.put(field.name(), val);
- }
+ Object recordValue = val != null ? val : field.defaultVal()
+ .map(defaultVal -> defaultVal == HoodieSchema.NULL_VALUE ? null :
defaultVal)
+ .orElse(null);
+ record.put(field.name(), recordValue);
}
- private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object
structFieldData, Schema schema) throws HoodieException {
+ private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object
structFieldData, HoodieSchema schema) throws HoodieException {
if (null == structFieldData) {
return null;
}
- schema = AvroSchemaUtils.getNonNullTypeFromUnion(schema);
+ schema = schema.getNonNullType();
/* Because we use Hive's 'string' type when Avro calls for enum, we have
to expressly check for enum-ness */
- if (Schema.Type.ENUM.equals(schema.getType())) {
+ if (HoodieSchemaType.ENUM == schema.getType()) {
assert fieldOI instanceof PrimitiveObjectInspector;
return serializeEnum((PrimitiveObjectInspector) fieldOI,
structFieldData, schema);
}
@@ -339,48 +336,51 @@ public class HiveAvroSerializer {
}
};
- private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object
structFieldData, Schema schema) throws HoodieException {
+ private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object
structFieldData, HoodieSchema schema) throws HoodieException {
try {
- return enums.retrieve(schema).retrieve(serializePrimitive(fieldOI,
structFieldData, schema));
+ return
enums.retrieve(schema.toAvroSchema()).retrieve(serializePrimitive(fieldOI,
structFieldData, schema));
} catch (Exception e) {
throw new HoodieException(e);
}
}
- private Object serializeStruct(StructTypeInfo typeInfo,
StructObjectInspector ssoi, Object o, Schema schema) {
+ private Object serializeStruct(StructTypeInfo typeInfo,
StructObjectInspector ssoi, Object o, HoodieSchema schema) {
int size = schema.getFields().size();
List<? extends StructField> allStructFieldRefs =
ssoi.getAllStructFieldRefs();
List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
- GenericData.Record record = new GenericData.Record(schema);
+ GenericData.Record record = new GenericData.Record(schema.toAvroSchema());
ArrayList<TypeInfo> allStructFieldTypeInfos =
typeInfo.getAllStructFieldTypeInfos();
for (int i = 0; i < size; i++) {
- Schema.Field field = schema.getFields().get(i);
+ HoodieSchemaField field = schema.getFields().get(i);
setUpRecordFieldFromWritable(allStructFieldTypeInfos.get(i),
structFieldsDataAsList.get(i),
allStructFieldRefs.get(i).getFieldObjectInspector(), record, field);
}
return record;
}
- private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object
structFieldData, Schema schema) throws HoodieException {
+ private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object
structFieldData, HoodieSchema schema) throws HoodieException {
switch (fieldOI.getPrimitiveCategory()) {
case BINARY:
- if (schema.getType() == Schema.Type.BYTES) {
+ if (schema.getType() == HoodieSchemaType.BYTES) {
return AvroSerdeUtils.getBufferFromBytes((byte[])
fieldOI.getPrimitiveJavaObject(structFieldData));
- } else if (schema.getType() == Schema.Type.FIXED) {
- GenericData.Fixed fixed = new GenericData.Fixed(schema, (byte[])
fieldOI.getPrimitiveJavaObject(structFieldData));
+ } else if (schema.getType() == HoodieSchemaType.FIXED) {
+ GenericData.Fixed fixed = new
GenericData.Fixed(schema.toAvroSchema(), (byte[])
fieldOI.getPrimitiveJavaObject(structFieldData));
return fixed;
} else {
throw new HoodieException("Unexpected Avro schema for Binary
TypeInfo: " + schema.getType());
}
case DECIMAL:
HiveDecimal dec = (HiveDecimal)
fieldOI.getPrimitiveJavaObject(structFieldData);
- LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)
schema.getLogicalType();
+ if (schema.getType() != HoodieSchemaType.DECIMAL) {
+ throw new HoodieException("Unexpected schema type for DECIMAL: " +
schema.getType());
+ }
+ HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) schema;
BigDecimal bd = new
BigDecimal(dec.toString()).setScale(decimal.getScale());
- if (schema.getType() == Schema.Type.BYTES) {
- return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, schema,
decimal);
+ if (decimal.isFixed()) {
+ return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd,
schema.toAvroSchema(), decimal.toAvroSchema().getLogicalType());
} else {
- return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema,
decimal);
+ return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd,
schema.toAvroSchema(), decimal.toAvroSchema().getLogicalType());
}
case CHAR:
HiveChar ch = (HiveChar)
fieldOI.getPrimitiveJavaObject(structFieldData);
@@ -396,7 +396,7 @@ public class HiveAvroSerializer {
case TIMESTAMP:
return HoodieHiveUtils.getMills(structFieldData);
case INT:
- if (schema.getLogicalType() != null &&
schema.getLogicalType().getName().equals("date")) {
+ if (schema.getType() == HoodieSchemaType.DATE) {
return new
WritableDateObjectInspector().getPrimitiveWritableObject(structFieldData).getDays();
}
return fieldOI.getPrimitiveJavaObject(structFieldData);
@@ -409,7 +409,7 @@ public class HiveAvroSerializer {
}
}
- private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+ private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
byte tag = fieldOI.getTag(structFieldData);
// Invariant that Avro's tag ordering must match Hive's.
@@ -419,20 +419,21 @@ public class HiveAvroSerializer {
schema.getTypes().get(tag));
}
- private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+ private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
List<?> list = fieldOI.getList(structFieldData);
- List<Object> deserialized = new GenericData.Array<Object>(list.size(),
schema);
+ List<Object> deserialized = new GenericData.Array<>(list.size(),
schema.toAvroSchema());
TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo();
ObjectInspector listElementObjectInspector =
fieldOI.getListElementObjectInspector();
// NOTE: We have to resolve nullable schema, since Avro permits array
elements
// to be null
- Schema arrayNestedType =
AvroSchemaUtils.getNonNullTypeFromUnion(schema.getElementType());
- Schema elementType;
+ HoodieSchema arrayNestedType = schema.getElementType().getNonNullType();
+ HoodieSchema elementType;
if (listElementObjectInspector.getCategory() ==
ObjectInspector.Category.PRIMITIVE) {
elementType = arrayNestedType;
} else {
- elementType = arrayNestedType.getField("element") == null ?
arrayNestedType : arrayNestedType.getField("element").schema();
+ elementType =
+
arrayNestedType.getField("element").map(HoodieSchemaField::schema).orElse(arrayNestedType);
}
for (int i = 0; i < list.size(); i++) {
Object childFieldData = list.get(i);
@@ -445,7 +446,7 @@ public class HiveAvroSerializer {
return deserialized;
}
- private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+ private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
// Avro only allows maps with string keys
if (!mapHasStringKey(fieldOI.getMapKeyObjectInspector())) {
throw new HoodieException("Avro only supports maps with keys as Strings.
Current Map is: " + typeInfo.toString());
@@ -456,7 +457,7 @@ public class HiveAvroSerializer {
TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo();
TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo();
Map<?, ?> map = fieldOI.getMap(structFieldData);
- Schema valueType = schema.getValueType();
+ HoodieSchema valueType = schema.getValueType();
Map<Object, Object> deserialized = new LinkedHashMap<Object,
Object>(fieldOI.getMapSize(structFieldData));
@@ -473,10 +474,10 @@ public class HiveAvroSerializer {
&& ((PrimitiveObjectInspector)
mapKeyObjectInspector).getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.STRING);
}
- public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord
oldRecord, Schema newSchema) {
- GenericRecord newRecord = new GenericData.Record(newSchema);
+ public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord
oldRecord, HoodieSchema newSchema) {
+ GenericRecord newRecord = new GenericData.Record(newSchema.toAvroSchema());
boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
- for (Schema.Field f : newSchema.getFields()) {
+ for (HoodieSchemaField f : newSchema.getFields()) {
if (!(isSpecificRecord && isMetadataField(f.name()))) {
copyOldValueOrSetDefault(oldRecord, newRecord, f);
}
@@ -484,7 +485,7 @@ public class HiveAvroSerializer {
return newRecord;
}
- private static void copyOldValueOrSetDefault(GenericRecord oldRecord,
GenericRecord newRecord, Schema.Field field) {
+ private static void copyOldValueOrSetDefault(GenericRecord oldRecord,
GenericRecord newRecord, HoodieSchemaField field) {
Schema oldSchema = oldRecord.getSchema();
Object fieldValue = oldSchema.getField(field.name()) == null ? null :
oldRecord.get(field.name());
@@ -493,12 +494,13 @@ public class HiveAvroSerializer {
Object newFieldValue;
if (fieldValue instanceof GenericRecord) {
GenericRecord record = (GenericRecord) fieldValue;
- newFieldValue = rewriteRecordIgnoreResultCheck(record,
AvroSchemaUtils.resolveUnionSchema(field.schema(),
record.getSchema().getFullName()));
+ HoodieSchema fieldSchema =
HoodieSchemaUtils.resolveUnionSchema(field.schema(),
record.getSchema().getFullName());
+ newFieldValue = rewriteRecordIgnoreResultCheck(record, fieldSchema);
} else {
newFieldValue = fieldValue;
}
newRecord.put(field.name(), newFieldValue);
- } else if (field.defaultVal() instanceof JsonProperties.Null) {
+ } else if (field.defaultVal() == HoodieSchema.NULL_VALUE) {
newRecord.put(field.name(), null);
} else {
newRecord.put(field.name(), field.defaultVal());
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
index a5383b63ebb6..0c4dcc766472 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
@@ -18,17 +18,17 @@
package org.apache.hudi.hadoop.utils;
-import org.apache.hudi.avro.AvroSchemaUtils;
-
-import static org.apache.avro.Schema.Type.BOOLEAN;
-import static org.apache.avro.Schema.Type.BYTES;
-import static org.apache.avro.Schema.Type.DOUBLE;
-import static org.apache.avro.Schema.Type.FIXED;
-import static org.apache.avro.Schema.Type.FLOAT;
-import static org.apache.avro.Schema.Type.INT;
-import static org.apache.avro.Schema.Type.LONG;
-import static org.apache.avro.Schema.Type.NULL;
-import static org.apache.avro.Schema.Type.STRING;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
+import org.apache.hadoop.hive.serde2.avro.InstanceCache;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,13 +38,24 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.avro.Schema;
-import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
-import org.apache.hadoop.hive.serde2.avro.InstanceCache;
-import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import static org.apache.hudi.common.schema.HoodieSchemaType.ARRAY;
+import static org.apache.hudi.common.schema.HoodieSchemaType.BOOLEAN;
+import static org.apache.hudi.common.schema.HoodieSchemaType.BYTES;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DATE;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DECIMAL;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DOUBLE;
+import static org.apache.hudi.common.schema.HoodieSchemaType.ENUM;
+import static org.apache.hudi.common.schema.HoodieSchemaType.FIXED;
+import static org.apache.hudi.common.schema.HoodieSchemaType.FLOAT;
+import static org.apache.hudi.common.schema.HoodieSchemaType.INT;
+import static org.apache.hudi.common.schema.HoodieSchemaType.LONG;
+import static org.apache.hudi.common.schema.HoodieSchemaType.MAP;
+import static org.apache.hudi.common.schema.HoodieSchemaType.NULL;
+import static org.apache.hudi.common.schema.HoodieSchemaType.RECORD;
+import static org.apache.hudi.common.schema.HoodieSchemaType.STRING;
+import static org.apache.hudi.common.schema.HoodieSchemaType.TIME;
+import static org.apache.hudi.common.schema.HoodieSchemaType.TIMESTAMP;
+import static org.apache.hudi.common.schema.HoodieSchemaType.UNION;
/**
* Convert an Avro Schema to a Hive TypeInfo
@@ -66,9 +77,9 @@ public class HiveTypeUtils {
// smallint
// Map of Avro's primitive types to Hives (for those that are supported by
both)
- private static final Map<Schema.Type, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO
= initTypeMap();
- private static Map<Schema.Type, TypeInfo> initTypeMap() {
- Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
+ private static final Map<HoodieSchemaType, TypeInfo>
PRIMITIVE_TYPE_TO_TYPE_INFO = initTypeMap();
+ private static Map<HoodieSchemaType, TypeInfo> initTypeMap() {
+ Map<HoodieSchemaType, TypeInfo> theMap = new Hashtable<>();
theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
@@ -90,7 +101,7 @@ public class HiveTypeUtils {
* from the schema.
* @throws AvroSerdeException for problems during conversion.
*/
- public static List<TypeInfo> generateColumnTypes(Schema schema) throws
AvroSerdeException {
+ public static List<TypeInfo> generateColumnTypes(HoodieSchema schema) throws
AvroSerdeException {
return generateColumnTypes(schema, null);
}
@@ -105,27 +116,28 @@ public class HiveTypeUtils {
* from the schema.
* @throws AvroSerdeException for problems during conversion.
*/
- public static List<TypeInfo> generateColumnTypes(Schema schema,
- Set<Schema> seenSchemas)
throws AvroSerdeException {
- List<Schema.Field> fields = schema.getFields();
+ public static List<TypeInfo> generateColumnTypes(HoodieSchema schema,
+ Set<HoodieSchema>
seenSchemas) throws AvroSerdeException {
+ List<HoodieSchemaField> fields = schema.getFields();
List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size());
- for (Schema.Field field : fields) {
+ for (HoodieSchemaField field : fields) {
types.add(generateTypeInfo(field.schema(), seenSchemas));
}
return types;
}
- static InstanceCache<Schema, TypeInfo> typeInfoCache = new
InstanceCache<Schema, TypeInfo>() {
+ static InstanceCache<HoodieSchema, TypeInfo> typeInfoCache = new
InstanceCache<HoodieSchema, TypeInfo>() {
@Override
- protected TypeInfo makeInstance(Schema s,
- Set<Schema> seenSchemas)
+ protected TypeInfo makeInstance(HoodieSchema s,
+ Set<HoodieSchema> seenSchemas)
throws AvroSerdeException {
return generateTypeInfoWorker(s, seenSchemas);
}
};
+
/**
* Convert an Avro Schema into an equivalent Hive TypeInfo.
* @param schema to record. Must be of record type.
@@ -134,33 +146,24 @@ public class HiveTypeUtils {
* @return TypeInfo matching the Avro schema
* @throws AvroSerdeException for any problems during conversion.
*/
- public static TypeInfo generateTypeInfo(Schema schema,
- Set<Schema> seenSchemas) throws
AvroSerdeException {
+ public static TypeInfo generateTypeInfo(HoodieSchema schema,
+ Set<HoodieSchema> seenSchemas)
throws AvroSerdeException {
// For bytes type, it can be mapped to decimal.
- Schema.Type type = schema.getType();
- // HUDI MODIFICATION ADDED "|| type == FIXED"
- if ((type == BYTES || type == FIXED) && AvroSerDe.DECIMAL_TYPE_NAME
- .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
- int precision = 0;
- int scale = 0;
- try {
- precision =
getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_PRECISION));
- scale = getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_SCALE));
- } catch (Exception ex) {
- throw new AvroSerdeException("Failed to obtain scale value from file
schema: " + schema, ex);
- }
-
+ HoodieSchemaType type = schema.getType();
+ if (type == DECIMAL) {
+ HoodieSchema.Decimal decimalSchema = (HoodieSchema.Decimal) schema;
+ int precision = decimalSchema.getPrecision();
+ int scale = decimalSchema.getScale();
try {
HiveDecimalUtils.validateParameter(precision, scale);
} catch (Exception ex) {
throw new AvroSerdeException("Invalid precision or scale for decimal
type", ex);
}
-
return TypeInfoFactory.getDecimalTypeInfo(precision, scale);
}
if (type == STRING
- &&
AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
{
+ && AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase((String)
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
int maxLength = 0;
try {
maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
@@ -171,7 +174,7 @@ public class HiveTypeUtils {
}
if (type == STRING && AvroSerDe.VARCHAR_TYPE_NAME
- .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+ .equalsIgnoreCase((String)
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
int maxLength = 0;
try {
maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
@@ -181,14 +184,33 @@ public class HiveTypeUtils {
return TypeInfoFactory.getVarcharTypeInfo(maxLength);
}
- if (type == INT
- &&
AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
{
+ if (type == DATE) {
return TypeInfoFactory.dateTypeInfo;
}
- if (type == LONG
- &&
AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
{
- return TypeInfoFactory.timestampTypeInfo;
+ if (type == TIMESTAMP) {
+ HoodieSchema.Timestamp timestampSchema = (HoodieSchema.Timestamp) schema;
+ switch (timestampSchema.getPrecision()) {
+ case MILLIS:
+ // Only millis precision is supported natively by Hive, check
AvroSerDe for full support matrix by Hive
+ return TypeInfoFactory.timestampTypeInfo;
+ case MICROS:
+ return TypeInfoFactory.longTypeInfo;
+ default:
+ throw new IllegalArgumentException("Unsupported timestamp precision
for Timestamp schema: " + timestampSchema.getPrecision());
+ }
+ }
+
+ if (type == TIME) {
+ HoodieSchema.Time timeSchema = (HoodieSchema.Time) schema;
+ switch (timeSchema.getPrecision()) {
+ case MILLIS:
+ return TypeInfoFactory.intTypeInfo;
+ case MICROS:
+ return TypeInfoFactory.longTypeInfo;
+ default:
+ throw new IllegalArgumentException("Unsupported time precision for
Time schema: " + timeSchema.getPrecision());
+ }
}
return typeInfoCache.retrieve(schema, seenSchemas);
@@ -224,8 +246,8 @@ public class HiveTypeUtils {
}
// added this from AvroSerdeUtils in hive latest
- public static int getIntFromSchema(Schema schema, String name) {
- Object obj = schema.getObjectProp(name);
+ public static int getIntFromSchema(HoodieSchema schema, String name) {
+ Object obj = schema.getObjectProps().get(name);
if (obj instanceof String) {
return Integer.parseInt((String) obj);
} else if (obj instanceof Integer) {
@@ -236,15 +258,15 @@ public class HiveTypeUtils {
}
}
- private static TypeInfo generateTypeInfoWorker(Schema schema,
- Set<Schema> seenSchemas)
throws AvroSerdeException {
- // Avro requires NULLable types to be defined as unions of some type T
+ private static TypeInfo generateTypeInfoWorker(HoodieSchema schema,
+ Set<HoodieSchema>
seenSchemas) throws AvroSerdeException {
+ // HoodieSchema requires NULLable types to be defined as unions of some
type T
// and NULL. This is annoying and we're going to hide it from the user.
- if (AvroSchemaUtils.isNullable(schema)) {
- return generateTypeInfo(AvroSchemaUtils.getNonNullTypeFromUnion(schema),
seenSchemas);
+ if (schema.isNullable()) {
+ return generateTypeInfo(schema.getNonNullType(), seenSchemas);
}
- Schema.Type type = schema.getType();
+ HoodieSchemaType type = schema.getType();
if (PRIMITIVE_TYPE_TO_TYPE_INFO.containsKey(type)) {
return PRIMITIVE_TYPE_TO_TYPE_INFO.get(type);
}
@@ -259,12 +281,12 @@ public class HiveTypeUtils {
}
}
- private static TypeInfo generateRecordTypeInfo(Schema schema,
- Set<Schema> seenSchemas)
throws AvroSerdeException {
- assert schema.getType().equals(Schema.Type.RECORD);
+ private static TypeInfo generateRecordTypeInfo(HoodieSchema schema,
+ Set<HoodieSchema>
seenSchemas) throws AvroSerdeException {
+ ValidationUtils.checkArgument(schema.getType() == RECORD, () -> schema + "
is not a RECORD");
if (seenSchemas == null) {
- seenSchemas = Collections.newSetFromMap(new IdentityHashMap<Schema,
Boolean>());
+ seenSchemas = Collections.newSetFromMap(new IdentityHashMap<>());
} else if (seenSchemas.contains(schema)) {
throw new AvroSerdeException(
"Recursive schemas are not supported. Recursive schema was " + schema
@@ -272,9 +294,9 @@ public class HiveTypeUtils {
}
seenSchemas.add(schema);
- List<Schema.Field> fields = schema.getFields();
- List<String> fieldNames = new ArrayList<String>(fields.size());
- List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size());
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<String> fieldNames = new ArrayList<>(fields.size());
+ List<TypeInfo> typeInfos = new ArrayList<>(fields.size());
for (int i = 0; i < fields.size(); i++) {
fieldNames.add(i, fields.get(i).name());
@@ -288,33 +310,32 @@ public class HiveTypeUtils {
* Generate a TypeInfo for an Avro Map. This is made slightly simpler in
that
* Avro only allows maps with strings for keys.
*/
- private static TypeInfo generateMapTypeInfo(Schema schema,
- Set<Schema> seenSchemas) throws
AvroSerdeException {
- assert schema.getType().equals(Schema.Type.MAP);
- Schema valueType = schema.getValueType();
+ private static TypeInfo generateMapTypeInfo(HoodieSchema schema,
+ Set<HoodieSchema> seenSchemas)
throws AvroSerdeException {
+ ValidationUtils.checkArgument(schema.getType() == MAP, () -> schema + " is
not MAP");
+ HoodieSchema valueType = schema.getValueType();
TypeInfo ti = generateTypeInfo(valueType, seenSchemas);
return
TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"),
ti);
}
- private static TypeInfo generateArrayTypeInfo(Schema schema,
- Set<Schema> seenSchemas)
throws AvroSerdeException {
- assert schema.getType().equals(Schema.Type.ARRAY);
- Schema itemsType = schema.getElementType();
+ private static TypeInfo generateArrayTypeInfo(HoodieSchema schema,
+ Set<HoodieSchema> seenSchemas)
throws AvroSerdeException {
+ ValidationUtils.checkArgument(schema.getType() == ARRAY, () -> schema + "
is not an ARRAY");
+ HoodieSchema itemsType = schema.getElementType();
TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas);
return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
}
- private static TypeInfo generateUnionTypeInfo(Schema schema,
- Set<Schema> seenSchemas)
throws AvroSerdeException {
- assert schema.getType().equals(Schema.Type.UNION);
- List<Schema> types = schema.getTypes();
-
+ private static TypeInfo generateUnionTypeInfo(HoodieSchema schema,
+ Set<HoodieSchema> seenSchemas)
throws AvroSerdeException {
+ ValidationUtils.checkArgument(schema.getType() == UNION, () -> schema +
"is not a UNION");
+ List<HoodieSchema> types = schema.getTypes();
- List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size());
+ List<TypeInfo> typeInfos = new ArrayList<>(types.size());
- for (Schema type : types) {
+ for (HoodieSchema type : types) {
typeInfos.add(generateTypeInfo(type, seenSchemas));
}
@@ -324,8 +345,8 @@ public class HiveTypeUtils {
// Hive doesn't have an Enum type, so we're going to treat them as Strings.
// During the deserialize/serialize stage we'll check for enumness and
// convert as such.
- private static TypeInfo generateEnumTypeInfo(Schema schema) {
- assert schema.getType().equals(Schema.Type.ENUM);
+ private static TypeInfo generateEnumTypeInfo(HoodieSchema schema) {
+ ValidationUtils.checkArgument(schema.getType() == ENUM, () -> schema + "
is not an ENUM");
return TypeInfoFactory.getPrimitiveTypeInfo("string");
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
index d3c8e186551e..7a6337778407 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
import org.junit.jupiter.api.BeforeEach;
@@ -50,9 +53,9 @@ class TestHoodieHiveRecord {
// Create a minimal HoodieHiveRecord instance with mocked dependencies
HoodieKey key = new HoodieKey("test-key", "test-partition");
ArrayWritable data = new ArrayWritable(Writable.class, new Writable[]{new
Text("test")});
- Schema schema = Schema.createRecord("TestRecord", null, null, false);
- schema.setFields(Collections.singletonList(new Schema.Field("testField",
Schema.create(Schema.Type.STRING), null, null)));
-
+ HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null,
false,
+ Collections.singletonList(HoodieSchemaField.of("testField",
HoodieSchema.create(HoodieSchemaType.STRING), null, null)));
+
// Create HoodieHiveRecord with mocked dependencies
hoodieHiveRecord = new HoodieHiveRecord(key, data, schema, new
HiveAvroSerializer(schema));
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
index d22093c24223..d4dac0934a25 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
@@ -19,15 +19,15 @@
package org.apache.hudi.hadoop.utils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.exception.HoodieException;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -84,24 +84,24 @@ public class TestHiveAvroSerializer {
@Test
public void testSerialize() {
- Schema avroSchema = new Schema.Parser().parse(SIMPLE_SCHEMA);
- // create a test record with avroSchema
- GenericData.Record avroRecord = new GenericData.Record(avroSchema);
+ HoodieSchema schema = HoodieSchema.parse(SIMPLE_SCHEMA);
+ // create a test record with schema
+ GenericData.Record avroRecord = new
GenericData.Record(schema.toAvroSchema());
avroRecord.put("id", 1);
avroRecord.put("col1", 1000L);
avroRecord.put("col2", -5.001f);
avroRecord.put("col3", 12.999d);
- Schema currentDecimalType =
avroSchema.getField("col4").schema().getTypes().get(1);
- BigDecimal bd = new BigDecimal("123.456").setScale(((LogicalTypes.Decimal)
currentDecimalType.getLogicalType()).getScale());
- avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd,
currentDecimalType, currentDecimalType.getLogicalType()));
+ HoodieSchema.Decimal currentDecimalType = (HoodieSchema.Decimal)
schema.getField("col4").get().schema().getTypes().get(1);
+ BigDecimal bd = new
BigDecimal("123.456").setScale(currentDecimalType.getScale());
+ avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd,
currentDecimalType.toAvroSchema(),
currentDecimalType.toAvroSchema().getLogicalType()));
avroRecord.put("col5", "2011-01-01");
avroRecord.put("col6", 18987);
avroRecord.put("col7", 1640491505111222L);
avroRecord.put("col8", false);
ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53});
avroRecord.put("col9", bb);
- assertTrue(GenericData.get().validate(avroSchema, avroRecord));
- ArrayWritable writable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema,
true);
+ assertTrue(GenericData.get().validate(schema.toAvroSchema(), avroRecord));
+ ArrayWritable writable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord,
schema.toAvroSchema(), true);
List<Writable> writableList =
Arrays.stream(writable.get()).collect(Collectors.toList());
writableList.remove(writableList.size() - 1);
@@ -110,20 +110,20 @@ public class TestHiveAvroSerializer {
List<TypeInfo> columnTypeList =
createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary,date");
List<String> columnNameList =
createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9,par");
StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
- GenericRecord testRecord = new HiveAvroSerializer(new
ArrayWritableObjectInspector(rowTypeInfo), columnNameList,
columnTypeList).serialize(writable, avroSchema);
- assertTrue(GenericData.get().validate(avroSchema, testRecord));
+ GenericRecord testRecord = new HiveAvroSerializer(new
ArrayWritableObjectInspector(rowTypeInfo), columnNameList,
columnTypeList).serialize(writable, schema);
+ assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecord));
// test
List<TypeInfo> columnTypeListClip =
createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary");
List<String> columnNameListClip =
createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9");
StructTypeInfo rowTypeInfoClip = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameListClip, columnTypeListClip);
- GenericRecord testRecordClip = new HiveAvroSerializer(new
ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip,
columnTypeListClip).serialize(clipWritable, avroSchema);
- assertTrue(GenericData.get().validate(avroSchema, testRecordClip));
+ GenericRecord testRecordClip = new HiveAvroSerializer(new
ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip,
columnTypeListClip).serialize(clipWritable, schema);
+ assertTrue(GenericData.get().validate(schema.toAvroSchema(),
testRecordClip));
}
@Test
public void testNestedValueSerialize() {
- Schema nestedSchema = new Schema.Parser().parse(NESTED_SCHEMA);
- GenericRecord avroRecord = new GenericData.Record(nestedSchema);
+ HoodieSchema nestedSchema = HoodieSchema.parse(NESTED_SCHEMA);
+ GenericRecord avroRecord = new
GenericData.Record(nestedSchema.toAvroSchema());
avroRecord.put("firstname", "person1");
avroRecord.put("lastname", "person2");
GenericArray scores = new
GenericData.Array<>(avroRecord.getSchema().getField("scores").schema(),
Arrays.asList(1,2));
@@ -136,14 +136,14 @@ public class TestHiveAvroSerializer {
GenericArray teachers = new
GenericData.Array<>(avroRecord.getSchema().getField("teachers").schema(),
Arrays.asList(studentRecord));
avroRecord.put("teachers", teachers);
- assertTrue(GenericData.get().validate(nestedSchema, avroRecord));
- ArrayWritable writable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, nestedSchema,
true);
+ assertTrue(GenericData.get().validate(nestedSchema.toAvroSchema(),
avroRecord));
+ ArrayWritable writable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord,
nestedSchema.toAvroSchema(), true);
List<TypeInfo> columnTypeList =
createHiveTypeInfoFrom("string,string,array<int>,struct<firstname:string,lastname:string>,array<struct<firstname:string,lastname:string>>");
List<String> columnNameList =
createHiveColumnsFrom("firstname,lastname,arrayRecord,student,teachers");
StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
GenericRecord testRecord = new HiveAvroSerializer(new
ArrayWritableObjectInspector(rowTypeInfo), columnNameList,
columnTypeList).serialize(writable, nestedSchema);
- assertTrue(GenericData.get().validate(nestedSchema, testRecord));
+ assertTrue(GenericData.get().validate(nestedSchema.toAvroSchema(),
testRecord));
}
private List<String> createHiveColumnsFrom(final String columnNamesStr) {
@@ -198,7 +198,7 @@ public class TestHiveAvroSerializer {
@Test
public void testGetTopLevelFields() {
- Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -216,7 +216,7 @@ public class TestHiveAvroSerializer {
@Test
public void testGetNestedFields() {
- Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -234,7 +234,7 @@ public class TestHiveAvroSerializer {
@Test
public void testInvalidFieldNameThrows() {
- Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -257,7 +257,7 @@ public class TestHiveAvroSerializer {
@Test
public void testGetValueFromArrayOrMap() {
- Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
@@ -298,7 +298,7 @@ public class TestHiveAvroSerializer {
@Test
public void testGetJavaTopLevelFields() {
- Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -316,7 +316,7 @@ public class TestHiveAvroSerializer {
@Test
public void testGetJavaNestedFields() {
- Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -334,7 +334,7 @@ public class TestHiveAvroSerializer {
@Test
public void testGetJavaArrayAndMap() {
- Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
@@ -381,7 +381,7 @@ public class TestHiveAvroSerializer {
@Test
public void testGetJavaInvalidFieldAccess() {
- Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
@@ -409,4 +409,203 @@ public class TestHiveAvroSerializer {
serializer.getValueAsJava(record, "properties.value");
});
}
+
+ @Test
+ public void testSerializeDecimalBackedByBytes() {
+ // Create schema with BYTES-backed decimal (not FIXED)
+ String schemaWithBytesDecimal =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithBytesDecimal);
+
+ // Create an Avro record with BYTES-backed decimal
+ GenericData.Record avroRecord = new
GenericData.Record(schema.toAvroSchema());
+ avroRecord.put("id", 42);
+
+ HoodieSchema.Decimal decimalType = (HoodieSchema.Decimal)
schema.getField("amount").get().schema().getTypes().get(1);
+ BigDecimal bd = new BigDecimal("1234.56").setScale(decimalType.getScale());
+ ByteBuffer decimalBytes = HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd,
decimalType.toAvroSchema(), decimalType.toAvroSchema().getLogicalType());
+ avroRecord.put("amount", decimalBytes);
+
+ assertTrue(GenericData.get().validate(schema.toAvroSchema(), avroRecord));
+
+ // Convert to ArrayWritable
+ ArrayWritable writable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord,
schema.toAvroSchema(), true);
+
+ // Set up Hive types and serializer
+ List<TypeInfo> columnTypeList =
createHiveTypeInfoFrom("int,decimal(10,2)");
+ List<String> columnNameList = createHiveColumnsFrom("id,amount");
+ StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+
+ // Serialize and verify
+ GenericRecord testRecord = new HiveAvroSerializer(new
ArrayWritableObjectInspector(rowTypeInfo), columnNameList,
columnTypeList).serialize(writable, schema);
+ assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecord));
+
+ // Verify the decimal value is correctly serialized
+ assertEquals(42, testRecord.get("id"));
+ ByteBuffer resultBytes = (ByteBuffer) testRecord.get("amount");
+ BigDecimal resultDecimal =
HoodieAvroUtils.DECIMAL_CONVERSION.fromBytes(resultBytes,
decimalType.toAvroSchema(), decimalType.toAvroSchema().getLogicalType());
+ assertEquals(bd, resultDecimal);
+ }
+
+ @Test
+ public void testSerializeDecimalBackedByFixed() {
+ // Create schema with FIXED-backed decimal (existing test covers this but
making it explicit)
+ String schemaWithFixedDecimal =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed_decimal\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithFixedDecimal);
+
+ // Create an Avro record with FIXED-backed decimal
+ GenericData.Record avroRecord = new
GenericData.Record(schema.toAvroSchema());
+ avroRecord.put("id", 42);
+
+ HoodieSchema.Decimal decimalType = (HoodieSchema.Decimal)
schema.getField("amount").get().schema().getTypes().get(1);
+ BigDecimal bd = new BigDecimal("1234.56").setScale(decimalType.getScale());
+ avroRecord.put("amount", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd,
decimalType.toAvroSchema(), decimalType.toAvroSchema().getLogicalType()));
+
+ assertTrue(GenericData.get().validate(schema.toAvroSchema(), avroRecord));
+
+ // Convert to ArrayWritable
+ ArrayWritable writable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord,
schema.toAvroSchema(), true);
+
+ // Set up Hive types and serializer
+ List<TypeInfo> columnTypeList =
createHiveTypeInfoFrom("int,decimal(10,2)");
+ List<String> columnNameList = createHiveColumnsFrom("id,amount");
+ StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+
+ // Serialize and verify
+ GenericRecord testRecord = new HiveAvroSerializer(new
ArrayWritableObjectInspector(rowTypeInfo), columnNameList,
columnTypeList).serialize(writable, schema);
+ assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecord));
+
+ // Verify the decimal value is correctly serialized
+ assertEquals(42, testRecord.get("id"));
+ GenericData.Fixed resultFixed = (GenericData.Fixed)
testRecord.get("amount");
+ BigDecimal resultDecimal =
HoodieAvroUtils.DECIMAL_CONVERSION.fromFixed(resultFixed,
decimalType.toAvroSchema(), decimalType.toAvroSchema().getLogicalType());
+ assertEquals(bd, resultDecimal);
+ }
+
+ @Test
+ public void testGenerateColumnTypesForDecimalBackedByBytes() throws
AvroSerdeException {
+ // Test HiveTypeUtils.generateColumnTypes and convertToTypeInfo branch at
lines 152-162 for decimal backed by bytes
+ String schemaWithDecimalBytes =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithDecimalBytes);
+
+ // Test that HiveTypeUtils.generateColumnTypes correctly identifies
bytes-backed decimal as decimal type
+ List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+ assertEquals(2, columnTypes.size());
+ assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+ // The second column should be decimal(10,2) type due to the decimal
logical type backed by bytes
+ assertEquals(TypeInfoFactory.getDecimalTypeInfo(10, 2),
columnTypes.get(1));
+ }
+
+ @Test
+ public void testGenerateColumnTypesForDecimalBackedByFixed() throws
AvroSerdeException {
+ // Test HiveTypeUtils.generateColumnTypes and convertToTypeInfo branch at
lines 152-162 for decimal backed by fixed
+ String schemaWithDecimalFixed =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed_decimal\",\"size\":6,\"logicalType\":\"decimal\",\"precision\":12,\"scale\":4}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithDecimalFixed);
+ assertInstanceOf(HoodieSchema.Decimal.class,
schema.getField("amount").get().getNonNullSchema());
+
+ // Test that HiveTypeUtils.generateColumnTypes correctly identifies
fixed-backed decimal as decimal type
+ List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+ assertEquals(2, columnTypes.size());
+ assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+ // The second column should be decimal(12,4) type due to the decimal
logical type backed by fixed
+ assertEquals(TypeInfoFactory.getDecimalTypeInfo(12, 4),
columnTypes.get(1));
+ }
+
+ @Test
+ public void testGenerateColumnTypesForDate() throws AvroSerdeException {
+ // Test HiveTypeUtils.generateColumnTypes and convertToTypeInfo branch at
lines 187-189 for date
+ String schemaWithDate =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"birth_date\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithDate);
+
+ // Test that HiveTypeUtils.generateColumnTypes correctly identifies date
as date type
+ List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+ assertEquals(2, columnTypes.size());
+ assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+ // The second column should be date type due to the date logical type
+ assertEquals(TypeInfoFactory.dateTypeInfo, columnTypes.get(1));
+ }
+
+ @Test
+ public void testGenerateColumnTypesForTimestampMillis() throws
AvroSerdeException {
+ // Test HiveTypeUtils.generateColumnTypes and convertToTypeInfo branch at
lines 192-194 for timestamp-millis
+ String schemaWithTimestampMillis =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"created_at\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithTimestampMillis);
+
+ // Test that HiveTypeUtils.generateColumnTypes correctly identifies
timestamp-millis as timestamp type
+ List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+ assertEquals(2, columnTypes.size());
+ assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+ // The second column should be timestamp type due to the timestamp-millis
logical type
+ assertEquals(TypeInfoFactory.timestampTypeInfo, columnTypes.get(1));
+ }
+
+ @Test
+ public void testGenerateColumnTypesForTimestampMicros() throws
AvroSerdeException {
+ // Test timestamp-micros - AvroSerDe.TIMESTAMP_TYPE_NAME is only
"timestamp-millis", NOT "timestamp-micros"
+ String schemaWithTimestampMicros =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"updated_at\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithTimestampMicros);
+
+ List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+ assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+ // The second column should be bigint type
+ assertEquals(TypeInfoFactory.longTypeInfo, columnTypes.get(1));
+ }
+
+ @Test
+ public void testGenerateColumnTypesForTimeMillis() throws AvroSerdeException
{
+ // Test time-millis logical type - there's no specific branch for TIME
type in HiveTypeUtils
+ String schemaWithTimeMillis =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"event_time\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"time-millis\"}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithTimeMillis);
+ List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+ System.out.println(columnTypes);
+ assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+ // The second column should be int type
+ assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(1));
+ }
+
+ @Test
+ public void testGenerateColumnTypesForTimeMicros() throws AvroSerdeException
{
+ // Test time-micros logical type
+ String schemaWithTimeMicros =
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ +
"{\"name\":\"event_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"time-micros\"}],\"default\":null}"
+ + "]}";
+
+ HoodieSchema schema = HoodieSchema.parse(schemaWithTimeMicros);
+ List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+ assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+ // The second column should be bigint type
+ assertEquals(TypeInfoFactory.longTypeInfo, columnTypes.get(1));
+ }
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
index 1c5a9575ecd5..026a0cdbea61 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
@@ -80,14 +80,14 @@ public class TestHoodieArrayWritableSchemaUtils {
//We reuse the ArrayWritable, so we need to get the values before
projecting
ArrayWritable record =
convertArrayWritable(dataGen.generateGenericRecord());
- HiveAvroSerializer fromSerializer = new
HiveAvroSerializer(from.toAvroSchema());
+ HiveAvroSerializer fromSerializer = new HiveAvroSerializer(from);
Object tripType = fromSerializer.getValue(record, "trip_type");
Object currentTs = fromSerializer.getValue(record, "current_ts");
Object weight = fromSerializer.getValue(record, "weight");
//Make sure the projected fields can be read
ArrayWritable projectedRecord =
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(record, from, to,
Collections.emptyMap());
- HiveAvroSerializer toSerializer = new
HiveAvroSerializer(to.toAvroSchema());
+ HiveAvroSerializer toSerializer = new HiveAvroSerializer(to);
assertEquals(tripType, toSerializer.getValue(projectedRecord,
"trip_type"));
assertEquals(currentTs, toSerializer.getValue(projectedRecord,
"current_ts"));
assertEquals(weight, toSerializer.getValue(projectedRecord, "weight"));
@@ -320,8 +320,8 @@ public class TestHoodieArrayWritableSchemaUtils {
Writable newWritable,
HoodieSchema newSchema
) throws AvroSerdeException {
- TypeInfo oldTypeInfo =
HiveTypeUtils.generateTypeInfo(oldSchema.toAvroSchema(),
Collections.emptySet());
- TypeInfo newTypeInfo =
HiveTypeUtils.generateTypeInfo(newSchema.toAvroSchema(),
Collections.emptySet());
+ TypeInfo oldTypeInfo = HiveTypeUtils.generateTypeInfo(oldSchema,
Collections.emptySet());
+ TypeInfo newTypeInfo = HiveTypeUtils.generateTypeInfo(newSchema,
Collections.emptySet());
ObjectInspector oldObjectInspector =
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(oldTypeInfo);
ObjectInspector newObjectInspector =
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(newTypeInfo);