This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal-p2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit bea2dec72b3ffcb5d28686066b8bf0ea0181e31c Author: voon <[email protected]> AuthorDate: Fri Dec 12 18:30:24 2025 +0800 Remove AvroSchemaUtils from HiveAvroSerializer and HiveTypeUtils --- .../hadoop/TestHoodieFileGroupReaderOnHive.java | 2 +- .../org/apache/hudi/avro/AvroRecordContext.java | 16 +- .../hudi/hadoop/HiveHoodieReaderContext.java | 2 +- .../org/apache/hudi/hadoop/HiveRecordContext.java | 11 +- .../org/apache/hudi/hadoop/HoodieHiveRecord.java | 11 +- .../realtime/RealtimeCompactedRecordReader.java | 4 +- .../hudi/hadoop/utils/HiveAvroSerializer.java | 120 ++++++++------- .../apache/hudi/hadoop/utils/HiveTypeUtils.java | 164 ++++++++++----------- .../apache/hudi/hadoop/TestHoodieHiveRecord.java | 9 +- .../hudi/hadoop/utils/TestHiveAvroSerializer.java | 53 ++++--- .../utils/TestHoodieArrayWritableSchemaUtils.java | 8 +- 11 files changed, 205 insertions(+), 195 deletions(-) diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java index 05d2c898f868..47b4114dea06 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java @@ -121,7 +121,7 @@ public class TestHoodieFileGroupReaderOnHive extends HoodieFileGroupReaderOnJava List<HoodieSchemaField> fields = schema.getFields(); setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION); try { - String columnTypes = HiveTypeUtils.generateColumnTypes(schema.toAvroSchema()).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(",")); + String columnTypes = HiveTypeUtils.generateColumnTypes(schema).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(",")); jobConf.set("columns.types", columnTypes + ",string"); } catch (SerDeException e) { throw new RuntimeException(e); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index 9fe1ea3f0650..947a292196ae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieEmptyRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.read.BufferedRecord; import org.apache.hudi.common.util.AvroJavaTypeConverter; @@ -32,7 +33,6 @@ import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -70,22 +70,20 @@ public class AvroRecordContext extends RecordContext<IndexedRecord> { public static Object getFieldValueFromIndexedRecord( IndexedRecord record, String fieldName) { - Schema currentSchema = record.getSchema(); + HoodieSchema currentSchema = HoodieSchema.fromAvroSchema(record.getSchema()); IndexedRecord currentRecord = record; String[] path = fieldName.split("\\."); for (int i = 0; i < path.length; i++) { - if (currentSchema.isUnion()) { - currentSchema = AvroSchemaUtils.getNonNullTypeFromUnion(currentSchema); - } - Schema.Field field = currentSchema.getField(path[i]); - if (field == null) { + currentSchema = currentSchema.getNonNullType(); + Option<HoodieSchemaField> fieldOpt = currentSchema.getField(path[i]); + if (fieldOpt.isEmpty()) { return null; } - Object value = currentRecord.get(field.pos()); + Object value = currentRecord.get(fieldOpt.get().pos()); if (i == path.length - 1) { return value; } - currentSchema = field.schema(); + currentSchema = fieldOpt.get().schema(); currentRecord = (IndexedRecord) value; } return null; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java index 4614854d4df3..38cd64c13f37 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java @@ -101,7 +101,7 @@ public class HiveHoodieReaderContext extends HoodieReaderContext<ArrayWritable> jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", dataColumnNameList)); List<TypeInfo> columnTypes; try { - columnTypes = HiveTypeUtils.generateColumnTypes(dataSchema.toAvroSchema()); + columnTypes = HiveTypeUtils.generateColumnTypes(dataSchema); } catch (AvroSerdeException e) { throw new HoodieAvroSchemaException(String.format("Failed to generate hive column types from schema: %s, due to %s", dataSchema, e)); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java index 8f52b290354b..bc12027921f6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java @@ -31,7 +31,6 @@ import org.apache.hudi.hadoop.utils.HiveJavaTypeConverter; import org.apache.hudi.hadoop.utils.HoodieArrayWritableSchemaUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -56,9 +55,9 @@ public class HiveRecordContext extends RecordContext<ArrayWritable> { return FIELD_ACCESSOR_INSTANCE; } - private final Map<Schema, HiveAvroSerializer> serializerCache = new ConcurrentHashMap<>(); + private final Map<HoodieSchema, HiveAvroSerializer> serializerCache = new ConcurrentHashMap<>(); - private HiveAvroSerializer getHiveAvroSerializer(Schema schema) { + private HiveAvroSerializer getHiveAvroSerializer(HoodieSchema schema) { return serializerCache.computeIfAbsent(schema, HiveAvroSerializer::new); } @@ -72,7 +71,7 @@ public class HiveRecordContext extends RecordContext<ArrayWritable> { @Override public Object getValue(ArrayWritable record, HoodieSchema schema, String fieldName) { - return getHiveAvroSerializer(schema.toAvroSchema()).getValue(record, fieldName); + return getHiveAvroSerializer(schema).getValue(record, fieldName); } @Override @@ -92,7 +91,7 @@ public class HiveRecordContext extends RecordContext<ArrayWritable> { } HoodieSchema schema = getSchemaFromBufferRecord(bufferedRecord); ArrayWritable writable = bufferedRecord.getRecord(); - return new HoodieHiveRecord(key, writable, schema.toAvroSchema(), getHiveAvroSerializer(schema.toAvroSchema()), + return new HoodieHiveRecord(key, writable, schema, getHiveAvroSerializer(schema), bufferedRecord.getHoodieOperation(), bufferedRecord.getOrderingValue(), bufferedRecord.isDelete()); } @@ -143,7 +142,7 @@ public class HiveRecordContext extends RecordContext<ArrayWritable> { @Override public GenericRecord convertToAvroRecord(ArrayWritable record, HoodieSchema schema) { - return getHiveAvroSerializer(schema.toAvroSchema()).serialize(record); + return getHiveAvroSerializer(schema).serialize(record); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java index 1ee19c32cecf..a3f5f857dc57 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.MetadataValues; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.read.DeleteContext; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.OrderingValues; @@ -60,9 +61,9 @@ public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> { private final HiveAvroSerializer avroSerializer; - protected Schema schema; + protected HoodieSchema schema; - public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, HiveAvroSerializer avroSerializer) { + public HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema schema, HiveAvroSerializer avroSerializer) { super(key, data); this.avroSerializer = avroSerializer; this.schema = schema; @@ -70,7 +71,7 @@ public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> { isDelete = data == null; } - public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation, Comparable orderingValue, boolean isDelete) { + public HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema schema, HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation, Comparable orderingValue, boolean isDelete) { super(key, data, hoodieOperation, isDelete, Option.empty()); this.orderingValue = orderingValue; this.avroSerializer = avroSerializer; @@ -78,7 +79,7 @@ public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> { this.copy = false; } - private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, HoodieOperation operation, boolean isCopy, + private HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema schema, HoodieOperation operation, boolean isCopy, HiveAvroSerializer avroSerializer) { super(key, data, operation, Option.empty()); this.schema = schema; @@ -246,7 +247,7 @@ public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> { return avroSerializer.getValue(data, name); } - protected Schema getSchema() { + protected HoodieSchema getSchema() { return schema; } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index fe0c91f6f3a1..d910145bd464 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -205,7 +205,7 @@ public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader // for presto engine, the hiveSchema will be: col1,col2, but the writerSchema will be col1,col2,par // so to be compatible with hive and presto, we should rewrite oldRecord before we call combineAndGetUpdateValue, // once presto on hudi have its own mor reader, we can remove the rewrite logical. - GenericRecord genericRecord = HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, getLogScannerReaderSchema().toAvroSchema()); + GenericRecord genericRecord = HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, getLogScannerReaderSchema()); RecordContext<IndexedRecord> recordContext = AvroRecordContext.getFieldAccessorInstance(); BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, HoodieSchema.fromAvroSchema(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false); BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, HoodieSchema.fromAvroSchema(getLogScannerReaderSchema().toAvroSchema()), @@ -218,7 +218,7 @@ public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader } private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable arrayWritable) { - GenericRecord record = serializer.serialize(arrayWritable, getHiveSchema().toAvroSchema()); + GenericRecord record = serializer.serialize(arrayWritable, getHiveSchema()); return record; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java index 16690ac360b5..96210d4b9956 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java @@ -18,14 +18,16 @@ package org.apache.hudi.hadoop.utils; -import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.schema.HoodieSchemaException; -import org.apache.avro.JsonProperties; -import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericEnumSymbol; @@ -62,6 +64,7 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -75,17 +78,16 @@ public class HiveAvroSerializer { private final List<String> columnNames; private final List<TypeInfo> columnTypes; private final ArrayWritableObjectInspector objectInspector; - private final Schema recordSchema; + private final HoodieSchema recordSchema; private static final Logger LOG = LoggerFactory.getLogger(HiveAvroSerializer.class); - public HiveAvroSerializer(Schema schema) { - schema = AvroSchemaUtils.getNonNullTypeFromUnion(schema); - if (schema.getType() != Schema.Type.RECORD) { + public HiveAvroSerializer(HoodieSchema schema) { + if (schema.getNonNullType().getType() != HoodieSchemaType.RECORD) { throw new IllegalArgumentException("Expected record schema, but got: " + schema); } this.recordSchema = schema; - this.columnNames = schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList()); + this.columnNames = schema.getFields().stream().map(HoodieSchemaField::name).map(String::toLowerCase).collect(Collectors.toList()); try { this.columnTypes = HiveTypeUtils.generateColumnTypes(schema); } catch (AvroSerdeException e) { @@ -171,7 +173,7 @@ public class HiveAvroSerializer { + "', but got " + context.typeInfo.getTypeName()); } - if (!(context.schema.getType() == Schema.Type.RECORD)) { + if (!(context.schema.getType() == HoodieSchemaType.RECORD)) { throw new HoodieException("Expected RecordSchema while resolving '" + path[i] + "', but got " + context.schema.getType()); } @@ -184,15 +186,13 @@ public class HiveAvroSerializer { } private FieldContext extractFieldFromRecord(ArrayWritable record, StructObjectInspector structObjectInspector, - List<TypeInfo> fieldTypes, Schema schema, String fieldName) { - Schema.Field schemaField = schema.getField(fieldName); - if (schemaField == null) { - throw new HoodieException("Field '" + fieldName + "' not found in schema: " + schema); - } + List<TypeInfo> fieldTypes, HoodieSchema schema, String fieldName) { + HoodieSchemaField schemaField = schema.getField(fieldName) + .orElseThrow(() -> new HoodieException("Field '" + fieldName + "' not found in schema: " + schema)); int fieldIdx = schemaField.pos(); TypeInfo fieldTypeInfo = fieldTypes.get(fieldIdx); - Schema fieldSchema = AvroSchemaUtils.getNonNullTypeFromUnion(schemaField.schema()); + HoodieSchema fieldSchema = schemaField.schema().getNonNullType(); StructField structField = structObjectInspector.getStructFieldRef(fieldName); if (structField == null) { @@ -216,9 +216,9 @@ public class HiveAvroSerializer { final TypeInfo typeInfo; final ObjectInspector objectInspector; final Object object; - final Schema schema; + final HoodieSchema schema; - FieldContext(Object object, ObjectInspector objectInspector, TypeInfo typeInfo, Schema schema) { + FieldContext(Object object, ObjectInspector objectInspector, TypeInfo typeInfo, HoodieSchema schema) { this.object = object; this.objectInspector = objectInspector; this.typeInfo = typeInfo; @@ -226,7 +226,7 @@ public class HiveAvroSerializer { } } - private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); + private static final HoodieSchema STRING_SCHEMA = HoodieSchema.create(HoodieSchemaType.STRING); public GenericRecord serialize(Object o) { if (recordSchema == null) { @@ -235,10 +235,10 @@ public class HiveAvroSerializer { return serialize(o, recordSchema); } - public GenericRecord serialize(Object o, Schema schema) { + public GenericRecord serialize(Object o, HoodieSchema schema) { StructObjectInspector soi = objectInspector; - GenericData.Record record = new GenericData.Record(schema); + GenericData.Record record = new GenericData.Record(schema.toAvroSchema()); List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs(); if (outputFieldRefs.size() != columnNames.size()) { @@ -251,7 +251,7 @@ public class HiveAvroSerializer { List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o); for (int i = 0; i < size; i++) { - Schema.Field field = schema.getFields().get(i); + HoodieSchemaField field = schema.getFields().get(i); if (i >= columnTypes.size()) { break; } @@ -268,28 +268,34 @@ public class HiveAvroSerializer { return record; } - private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object structFieldData, ObjectInspector fieldOI, GenericData.Record record, Schema.Field field) { + private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object structFieldData, ObjectInspector fieldOI, GenericData.Record record, HoodieSchemaField field) { Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema()); if (val == null) { - if (field.defaultVal() instanceof JsonProperties.Null) { + Option<Object> defaultValOpt = field.defaultVal(); + // In Avro/HoodieSchema, field.defaultVal() returns: + // - JsonProperties.Null / HoodieSchema.NULL_VALUE = if default is explicitly null + // - null / isEmpty() = if field has NO default value + // - some value = if field has an actual default + if (defaultValOpt.isPresent() && defaultValOpt.get() == HoodieSchema.NULL_VALUE) { record.put(field.name(), null); } else { - record.put(field.name(), field.defaultVal()); + // is not present or has some value + record.put(field.name(), defaultValOpt.orElse(null)); } } else { record.put(field.name(), val); } } - private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException { if (null == structFieldData) { return null; } - schema = AvroSchemaUtils.getNonNullTypeFromUnion(schema); + schema = schema.getNonNullType(); /* Because we use Hive's 'string' type when Avro calls for enum, we have to expressly check for enum-ness */ - if (Schema.Type.ENUM.equals(schema.getType())) { + if (HoodieSchemaType.ENUM == schema.getType()) { assert fieldOI instanceof PrimitiveObjectInspector; return serializeEnum((PrimitiveObjectInspector) fieldOI, structFieldData, schema); } @@ -339,48 +345,48 @@ public class HiveAvroSerializer { } }; - private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException { try { - return enums.retrieve(schema).retrieve(serializePrimitive(fieldOI, structFieldData, schema)); + return enums.retrieve(schema.toAvroSchema()).retrieve(serializePrimitive(fieldOI, structFieldData, schema)); } catch (Exception e) { throw new HoodieException(e); } } - private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) { + private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, HoodieSchema schema) { int size = schema.getFields().size(); List<? extends StructField> allStructFieldRefs = ssoi.getAllStructFieldRefs(); List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o); - GenericData.Record record = new GenericData.Record(schema); + GenericData.Record record = new GenericData.Record(schema.toAvroSchema()); ArrayList<TypeInfo> allStructFieldTypeInfos = typeInfo.getAllStructFieldTypeInfos(); for (int i = 0; i < size; i++) { - Schema.Field field = schema.getFields().get(i); + HoodieSchemaField field = schema.getFields().get(i); setUpRecordFieldFromWritable(allStructFieldTypeInfos.get(i), structFieldsDataAsList.get(i), allStructFieldRefs.get(i).getFieldObjectInspector(), record, field); } return record; } - private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException { switch (fieldOI.getPrimitiveCategory()) { case BINARY: - if (schema.getType() == Schema.Type.BYTES) { + if (schema.getType() == HoodieSchemaType.BYTES) { return AvroSerdeUtils.getBufferFromBytes((byte[]) fieldOI.getPrimitiveJavaObject(structFieldData)); - } else if (schema.getType() == Schema.Type.FIXED) { - GenericData.Fixed fixed = new GenericData.Fixed(schema, (byte[]) fieldOI.getPrimitiveJavaObject(structFieldData)); + } else if (schema.getType() == HoodieSchemaType.FIXED) { + GenericData.Fixed fixed = new GenericData.Fixed(schema.toAvroSchema(), (byte[]) fieldOI.getPrimitiveJavaObject(structFieldData)); return fixed; } else { throw new HoodieException("Unexpected Avro schema for Binary TypeInfo: " + schema.getType()); } case DECIMAL: HiveDecimal dec = (HiveDecimal) fieldOI.getPrimitiveJavaObject(structFieldData); - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) schema.getLogicalType(); + HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) schema; BigDecimal bd = new BigDecimal(dec.toString()).setScale(decimal.getScale()); - if (schema.getType() == Schema.Type.BYTES) { - return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, schema, decimal); + if (schema.getType() == HoodieSchemaType.BYTES) { + return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, schema.toAvroSchema(), decimal.toAvroSchema().getLogicalType()); } else { - return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal); + return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema.toAvroSchema(), decimal.toAvroSchema().getLogicalType()); } case CHAR: HiveChar ch = (HiveChar) fieldOI.getPrimitiveJavaObject(structFieldData); @@ -396,7 +402,7 @@ public class HiveAvroSerializer { case TIMESTAMP: return HoodieHiveUtils.getMills(structFieldData); case INT: - if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) { + if (schema.getType() != null && schema.getType() == HoodieSchemaType.DATE) { return new WritableDateObjectInspector().getPrimitiveWritableObject(structFieldData).getDays(); } return fieldOI.getPrimitiveJavaObject(structFieldData); @@ -409,7 +415,7 @@ public class HiveAvroSerializer { } } - private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException { byte tag = fieldOI.getTag(structFieldData); // Invariant that Avro's tag ordering must match Hive's. @@ -419,20 +425,20 @@ public class HiveAvroSerializer { schema.getTypes().get(tag)); } - private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException { List<?> list = fieldOI.getList(structFieldData); - List<Object> deserialized = new GenericData.Array<Object>(list.size(), schema); + List<Object> deserialized = new GenericData.Array<>(list.size(), schema.toAvroSchema()); TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo(); ObjectInspector listElementObjectInspector = fieldOI.getListElementObjectInspector(); // NOTE: We have to resolve nullable schema, since Avro permits array elements // to be null - Schema arrayNestedType = AvroSchemaUtils.getNonNullTypeFromUnion(schema.getElementType()); - Schema elementType; + HoodieSchema arrayNestedType = schema.getElementType().getNonNullType(); + HoodieSchema elementType; if (listElementObjectInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) { elementType = arrayNestedType; } else { - elementType = arrayNestedType.getField("element") == null ? arrayNestedType : arrayNestedType.getField("element").schema(); + elementType = arrayNestedType.getField("element").isEmpty() ? arrayNestedType : arrayNestedType.getField("element").get().schema(); } for (int i = 0; i < list.size(); i++) { Object childFieldData = list.get(i); @@ -445,7 +451,7 @@ public class HiveAvroSerializer { return deserialized; } - private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException { // Avro only allows maps with string keys if (!mapHasStringKey(fieldOI.getMapKeyObjectInspector())) { throw new HoodieException("Avro only supports maps with keys as Strings. Current Map is: " + typeInfo.toString()); @@ -456,7 +462,7 @@ public class HiveAvroSerializer { TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo(); TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo(); Map<?, ?> map = fieldOI.getMap(structFieldData); - Schema valueType = schema.getValueType(); + HoodieSchema valueType = schema.getValueType(); Map<Object, Object> deserialized = new LinkedHashMap<Object, Object>(fieldOI.getMapSize(structFieldData)); @@ -473,10 +479,10 @@ public class HiveAvroSerializer { && ((PrimitiveObjectInspector) mapKeyObjectInspector).getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.STRING); } - public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord oldRecord, Schema newSchema) { - GenericRecord newRecord = new GenericData.Record(newSchema); + public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord oldRecord, HoodieSchema newSchema) { + GenericRecord newRecord = new GenericData.Record(newSchema.toAvroSchema()); boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase; - for (Schema.Field f : newSchema.getFields()) { + for (HoodieSchemaField f : newSchema.getFields()) { if (!(isSpecificRecord && isMetadataField(f.name()))) { copyOldValueOrSetDefault(oldRecord, newRecord, f); } @@ -484,7 +490,7 @@ public class HiveAvroSerializer { return newRecord; } - private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) { + private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, HoodieSchemaField field) { Schema oldSchema = oldRecord.getSchema(); Object fieldValue = oldSchema.getField(field.name()) == null ? null : oldRecord.get(field.name()); @@ -493,12 +499,16 @@ public class HiveAvroSerializer { Object newFieldValue; if (fieldValue instanceof GenericRecord) { GenericRecord record = (GenericRecord) fieldValue; - newFieldValue = rewriteRecordIgnoreResultCheck(record, AvroSchemaUtils.resolveUnionSchema(field.schema(), record.getSchema().getFullName())); + HoodieSchema nonNullFieldSchema = field.schema().getNonNullType(); + if (!Objects.equals(nonNullFieldSchema.getFullName(), record.getSchema().getFullName())) { + throw new HoodieSchemaException(String.format("Unsupported UNION type %s: Only UNION of a null type and a non-null type is supported", field.schema())); + } + newFieldValue = rewriteRecordIgnoreResultCheck(record, nonNullFieldSchema); } else { newFieldValue = fieldValue; } newRecord.put(field.name(), newFieldValue); - } else if (field.defaultVal() instanceof JsonProperties.Null) { + } else if (field.defaultVal() == HoodieSchema.NULL_VALUE) { newRecord.put(field.name(), null); } else { newRecord.put(field.name(), field.defaultVal()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java index a5383b63ebb6..bcea1d628de7 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java @@ -18,17 +18,17 @@ package org.apache.hudi.hadoop.utils; -import org.apache.hudi.avro.AvroSchemaUtils; - -import static org.apache.avro.Schema.Type.BOOLEAN; -import static org.apache.avro.Schema.Type.BYTES; -import static org.apache.avro.Schema.Type.DOUBLE; -import static org.apache.avro.Schema.Type.FIXED; -import static org.apache.avro.Schema.Type.FLOAT; -import static org.apache.avro.Schema.Type.INT; -import static org.apache.avro.Schema.Type.LONG; -import static org.apache.avro.Schema.Type.NULL; -import static org.apache.avro.Schema.Type.STRING; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; +import org.apache.hadoop.hive.serde2.avro.InstanceCache; +import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import java.util.ArrayList; import java.util.Collections; @@ -38,13 +38,22 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.avro.Schema; -import org.apache.hadoop.hive.serde2.avro.AvroSerDe; -import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; -import org.apache.hadoop.hive.serde2.avro.InstanceCache; -import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import static org.apache.hudi.common.schema.HoodieSchemaType.ARRAY; +import static org.apache.hudi.common.schema.HoodieSchemaType.BOOLEAN; +import static org.apache.hudi.common.schema.HoodieSchemaType.BYTES; +import static org.apache.hudi.common.schema.HoodieSchemaType.DATE; +import static org.apache.hudi.common.schema.HoodieSchemaType.DECIMAL; +import static org.apache.hudi.common.schema.HoodieSchemaType.DOUBLE; +import static org.apache.hudi.common.schema.HoodieSchemaType.ENUM; +import static org.apache.hudi.common.schema.HoodieSchemaType.FIXED; +import static org.apache.hudi.common.schema.HoodieSchemaType.FLOAT; +import static org.apache.hudi.common.schema.HoodieSchemaType.INT; +import static org.apache.hudi.common.schema.HoodieSchemaType.LONG; +import static org.apache.hudi.common.schema.HoodieSchemaType.MAP; +import static org.apache.hudi.common.schema.HoodieSchemaType.NULL; +import static org.apache.hudi.common.schema.HoodieSchemaType.RECORD; +import static org.apache.hudi.common.schema.HoodieSchemaType.STRING; +import static org.apache.hudi.common.schema.HoodieSchemaType.UNION; /** * Convert an Avro Schema to a Hive TypeInfo @@ -66,9 +75,9 @@ public class HiveTypeUtils { // smallint // Map of Avro's primitive types to Hives (for those that are supported by both) - private static final Map<Schema.Type, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO = initTypeMap(); - private static Map<Schema.Type, TypeInfo> initTypeMap() { - Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>(); + private static final Map<HoodieSchemaType, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO = initTypeMap(); + private static Map<HoodieSchemaType, TypeInfo> initTypeMap() { + Map<HoodieSchemaType, TypeInfo> theMap = new Hashtable<>(); theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void")); theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean")); theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int")); @@ -90,7 +99,7 @@ public class HiveTypeUtils { * from the schema. * @throws AvroSerdeException for problems during conversion. */ - public static List<TypeInfo> generateColumnTypes(Schema schema) throws AvroSerdeException { + public static List<TypeInfo> generateColumnTypes(HoodieSchema schema) throws AvroSerdeException { return generateColumnTypes(schema, null); } @@ -105,23 +114,23 @@ public class HiveTypeUtils { * from the schema. * @throws AvroSerdeException for problems during conversion. */ - public static List<TypeInfo> generateColumnTypes(Schema schema, - Set<Schema> seenSchemas) throws AvroSerdeException { - List<Schema.Field> fields = schema.getFields(); + public static List<TypeInfo> generateColumnTypes(HoodieSchema schema, + Set<HoodieSchema> seenSchemas) throws AvroSerdeException { + List<HoodieSchemaField> fields = schema.getFields(); List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size()); - for (Schema.Field field : fields) { + for (HoodieSchemaField field : fields) { types.add(generateTypeInfo(field.schema(), seenSchemas)); } return types; } - static InstanceCache<Schema, TypeInfo> typeInfoCache = new InstanceCache<Schema, TypeInfo>() { + static InstanceCache<HoodieSchema, TypeInfo> typeInfoCache = new InstanceCache<HoodieSchema, TypeInfo>() { @Override - protected TypeInfo makeInstance(Schema s, - Set<Schema> seenSchemas) + protected TypeInfo makeInstance(HoodieSchema s, + Set<HoodieSchema> seenSchemas) throws AvroSerdeException { return generateTypeInfoWorker(s, seenSchemas); } @@ -134,33 +143,25 @@ public class HiveTypeUtils { * @return TypeInfo matching the Avro schema * @throws AvroSerdeException for any problems during conversion. */ - public static TypeInfo generateTypeInfo(Schema schema, - Set<Schema> seenSchemas) throws AvroSerdeException { + public static TypeInfo generateTypeInfo(HoodieSchema schema, + Set<HoodieSchema> seenSchemas) throws AvroSerdeException { // For bytes type, it can be mapped to decimal. - Schema.Type type = schema.getType(); - // HUDI MODIFICATION ADDED "|| type == FIXED" - if ((type == BYTES || type == FIXED) && AvroSerDe.DECIMAL_TYPE_NAME - .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { - int precision = 0; - int scale = 0; - try { - precision = getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_PRECISION)); - scale = getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_SCALE)); - } catch (Exception ex) { - throw new AvroSerdeException("Failed to obtain scale value from file schema: " + schema, ex); - } - + HoodieSchemaType type = schema.getType(); + if (type == DECIMAL && AvroSerDe.DECIMAL_TYPE_NAME + .equalsIgnoreCase((String) schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + HoodieSchema.Decimal decimalSchema = (HoodieSchema.Decimal) schema; + int precision = decimalSchema.getPrecision(); + int scale = decimalSchema.getScale(); try { HiveDecimalUtils.validateParameter(precision, scale); } catch (Exception ex) { throw new AvroSerdeException("Invalid precision or scale for decimal type", ex); } - return TypeInfoFactory.getDecimalTypeInfo(precision, scale); } if (type == STRING - && AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + && AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase((String) schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { int maxLength = 0; try { maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH); @@ -171,7 +172,7 @@ public class HiveTypeUtils { } if (type == STRING && AvroSerDe.VARCHAR_TYPE_NAME - .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + .equalsIgnoreCase((String) schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { int maxLength = 0; try { maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH); @@ -181,13 +182,13 @@ public class HiveTypeUtils { return TypeInfoFactory.getVarcharTypeInfo(maxLength); } - if (type == INT - && AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + if (type == DATE + && AvroSerDe.DATE_TYPE_NAME.equals((String) schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { return TypeInfoFactory.dateTypeInfo; } if (type == LONG - && AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + && AvroSerDe.TIMESTAMP_TYPE_NAME.equals((String) schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { return TypeInfoFactory.timestampTypeInfo; } @@ -224,8 +225,8 @@ public class HiveTypeUtils { } // added this from AvroSerdeUtils in hive latest - public static int getIntFromSchema(Schema schema, String name) { - Object obj = schema.getObjectProp(name); + public static int getIntFromSchema(HoodieSchema schema, String name) { + Object obj = schema.getProp(name); if (obj instanceof String) { return Integer.parseInt((String) obj); } else if (obj instanceof Integer) { @@ -236,15 +237,15 @@ public class HiveTypeUtils { } } - private static TypeInfo generateTypeInfoWorker(Schema schema, - Set<Schema> seenSchemas) throws AvroSerdeException { - // Avro requires NULLable types to be defined as unions of some type T + private static TypeInfo generateTypeInfoWorker(HoodieSchema schema, + Set<HoodieSchema> seenSchemas) throws AvroSerdeException { + // HoodieSchema requires NULLable types to be defined as unions of some type T // and NULL. This is annoying and we're going to hide it from the user. - if (AvroSchemaUtils.isNullable(schema)) { - return generateTypeInfo(AvroSchemaUtils.getNonNullTypeFromUnion(schema), seenSchemas); + if (schema.isNullable()) { + return generateTypeInfo(schema.getNonNullType(), seenSchemas); } - Schema.Type type = schema.getType(); + HoodieSchemaType type = schema.getType(); if (PRIMITIVE_TYPE_TO_TYPE_INFO.containsKey(type)) { return PRIMITIVE_TYPE_TO_TYPE_INFO.get(type); } @@ -259,12 +260,12 @@ public class HiveTypeUtils { } } - private static TypeInfo generateRecordTypeInfo(Schema schema, - Set<Schema> seenSchemas) throws AvroSerdeException { - assert schema.getType().equals(Schema.Type.RECORD); + private static TypeInfo generateRecordTypeInfo(HoodieSchema schema, + Set<HoodieSchema> seenSchemas) throws AvroSerdeException { + ValidationUtils.checkArgument(schema.getType() == RECORD, schema + " is not a RECORD"); if (seenSchemas == null) { - seenSchemas = Collections.newSetFromMap(new IdentityHashMap<Schema, Boolean>()); + seenSchemas = Collections.newSetFromMap(new IdentityHashMap<>()); } else if (seenSchemas.contains(schema)) { throw new AvroSerdeException( "Recursive schemas are not supported. Recursive schema was " + schema @@ -272,9 +273,9 @@ public class HiveTypeUtils { } seenSchemas.add(schema); - List<Schema.Field> fields = schema.getFields(); - List<String> fieldNames = new ArrayList<String>(fields.size()); - List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size()); + List<HoodieSchemaField> fields = schema.getFields(); + List<String> fieldNames = new ArrayList<>(fields.size()); + List<TypeInfo> typeInfos = new ArrayList<>(fields.size()); for (int i = 0; i < fields.size(); i++) { fieldNames.add(i, fields.get(i).name()); @@ -288,33 +289,32 @@ public class HiveTypeUtils { * Generate a TypeInfo for an Avro Map. This is made slightly simpler in that * Avro only allows maps with strings for keys. */ - private static TypeInfo generateMapTypeInfo(Schema schema, - Set<Schema> seenSchemas) throws AvroSerdeException { - assert schema.getType().equals(Schema.Type.MAP); - Schema valueType = schema.getValueType(); + private static TypeInfo generateMapTypeInfo(HoodieSchema schema, + Set<HoodieSchema> seenSchemas) throws AvroSerdeException { + ValidationUtils.checkArgument(schema.getType() == MAP, schema + " is not MAP"); + HoodieSchema valueType = schema.getValueType(); TypeInfo ti = generateTypeInfo(valueType, seenSchemas); return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), ti); } - private static TypeInfo generateArrayTypeInfo(Schema schema, - Set<Schema> seenSchemas) throws AvroSerdeException { - assert schema.getType().equals(Schema.Type.ARRAY); - Schema itemsType = schema.getElementType(); + private static TypeInfo generateArrayTypeInfo(HoodieSchema schema, + Set<HoodieSchema> seenSchemas) throws AvroSerdeException { + ValidationUtils.checkArgument(schema.getType() == ARRAY, schema + " is not an ARRAY"); + HoodieSchema itemsType = schema.getElementType(); TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas); return TypeInfoFactory.getListTypeInfo(itemsTypeInfo); } - private static TypeInfo generateUnionTypeInfo(Schema schema, - Set<Schema> seenSchemas) throws AvroSerdeException { - assert schema.getType().equals(Schema.Type.UNION); - List<Schema> types = schema.getTypes(); - + private static TypeInfo generateUnionTypeInfo(HoodieSchema schema, + Set<HoodieSchema> seenSchemas) throws AvroSerdeException { + ValidationUtils.checkArgument(schema.getType() == UNION, schema + "is not a UNION"); + List<HoodieSchema> types = schema.getTypes(); - List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size()); + List<TypeInfo> typeInfos = new ArrayList<>(types.size()); - for (Schema type : types) { + for (HoodieSchema type : types) { typeInfos.add(generateTypeInfo(type, seenSchemas)); } @@ -324,8 +324,8 @@ public class HiveTypeUtils { // Hive doesn't have an Enum type, so we're going to treat them as Strings. // During the deserialize/serialize stage we'll check for enumness and // convert as such. - private static TypeInfo generateEnumTypeInfo(Schema schema) { - assert schema.getType().equals(Schema.Type.ENUM); + private static TypeInfo generateEnumTypeInfo(HoodieSchema schema) { + ValidationUtils.checkArgument(schema.getType() == ENUM, schema + " is not an ENUM"); return TypeInfoFactory.getPrimitiveTypeInfo("string"); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java index d3c8e186551e..7a6337778407 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java @@ -27,6 +27,9 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.hadoop.utils.HiveAvroSerializer; import org.junit.jupiter.api.BeforeEach; @@ -50,9 +53,9 @@ class TestHoodieHiveRecord { // Create a minimal HoodieHiveRecord instance with mocked dependencies HoodieKey key = new HoodieKey("test-key", "test-partition"); ArrayWritable data = new ArrayWritable(Writable.class, new Writable[]{new Text("test")}); - Schema schema = Schema.createRecord("TestRecord", null, null, false); - schema.setFields(Collections.singletonList(new Schema.Field("testField", Schema.create(Schema.Type.STRING), null, null))); - + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, false, + Collections.singletonList(HoodieSchemaField.of("testField", HoodieSchema.create(HoodieSchemaType.STRING), null, null))); + // Create HoodieHiveRecord with mocked dependencies hoodieHiveRecord = new HoodieHiveRecord(key, data, schema, new HiveAvroSerializer(schema)); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java index d22093c24223..16a237fe1f0e 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java @@ -19,10 +19,9 @@ package org.apache.hudi.hadoop.utils; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.exception.HoodieException; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -84,24 +83,24 @@ public class TestHiveAvroSerializer { @Test public void testSerialize() { - Schema avroSchema = new Schema.Parser().parse(SIMPLE_SCHEMA); - // create a test record with avroSchema - GenericData.Record avroRecord = new GenericData.Record(avroSchema); + HoodieSchema schema = HoodieSchema.parse(SIMPLE_SCHEMA); + // create a test record with schema + GenericData.Record avroRecord = new GenericData.Record(schema.toAvroSchema()); avroRecord.put("id", 1); avroRecord.put("col1", 1000L); avroRecord.put("col2", -5.001f); avroRecord.put("col3", 12.999d); - Schema currentDecimalType = avroSchema.getField("col4").schema().getTypes().get(1); - BigDecimal bd = new BigDecimal("123.456").setScale(((LogicalTypes.Decimal) currentDecimalType.getLogicalType()).getScale()); - avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, currentDecimalType, currentDecimalType.getLogicalType())); + HoodieSchema.Decimal currentDecimalType = (HoodieSchema.Decimal) schema.getField("col4").get().schema().getTypes().get(1); + BigDecimal bd = new BigDecimal("123.456").setScale(currentDecimalType.getScale()); + avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, currentDecimalType.toAvroSchema(), currentDecimalType.toAvroSchema().getLogicalType())); avroRecord.put("col5", "2011-01-01"); avroRecord.put("col6", 18987); avroRecord.put("col7", 1640491505111222L); avroRecord.put("col8", false); ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53}); avroRecord.put("col9", bb); - assertTrue(GenericData.get().validate(avroSchema, avroRecord)); - ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema, true); + assertTrue(GenericData.get().validate(schema.toAvroSchema(), avroRecord)); + ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, schema.toAvroSchema(), true); List<Writable> writableList = Arrays.stream(writable.get()).collect(Collectors.toList()); writableList.remove(writableList.size() - 1); @@ -110,20 +109,20 @@ public class TestHiveAvroSerializer { List<TypeInfo> columnTypeList = createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary,date"); List<String> columnNameList = createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9,par"); StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); - GenericRecord testRecord = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList).serialize(writable, avroSchema); - assertTrue(GenericData.get().validate(avroSchema, testRecord)); + GenericRecord testRecord = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList).serialize(writable, schema); + assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecord)); // test List<TypeInfo> columnTypeListClip = createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary"); List<String> columnNameListClip = createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9"); StructTypeInfo rowTypeInfoClip = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameListClip, columnTypeListClip); - GenericRecord testRecordClip = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, columnTypeListClip).serialize(clipWritable, avroSchema); - assertTrue(GenericData.get().validate(avroSchema, testRecordClip)); + GenericRecord testRecordClip = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, columnTypeListClip).serialize(clipWritable, schema); + assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecordClip)); } @Test public void testNestedValueSerialize() { - Schema nestedSchema = new Schema.Parser().parse(NESTED_SCHEMA); - GenericRecord avroRecord = new GenericData.Record(nestedSchema); + HoodieSchema nestedSchema = HoodieSchema.parse(NESTED_SCHEMA); + GenericRecord avroRecord = new GenericData.Record(nestedSchema.toAvroSchema()); avroRecord.put("firstname", "person1"); avroRecord.put("lastname", "person2"); GenericArray scores = new GenericData.Array<>(avroRecord.getSchema().getField("scores").schema(), Arrays.asList(1,2)); @@ -136,14 +135,14 @@ public class TestHiveAvroSerializer { GenericArray teachers = new GenericData.Array<>(avroRecord.getSchema().getField("teachers").schema(), Arrays.asList(studentRecord)); avroRecord.put("teachers", teachers); - assertTrue(GenericData.get().validate(nestedSchema, avroRecord)); - ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, nestedSchema, true); + assertTrue(GenericData.get().validate(nestedSchema.toAvroSchema(), avroRecord)); + ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, nestedSchema.toAvroSchema(), true); List<TypeInfo> columnTypeList = createHiveTypeInfoFrom("string,string,array<int>,struct<firstname:string,lastname:string>,array<struct<firstname:string,lastname:string>>"); List<String> columnNameList = createHiveColumnsFrom("firstname,lastname,arrayRecord,student,teachers"); StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); GenericRecord testRecord = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList).serialize(writable, nestedSchema); - assertTrue(GenericData.get().validate(nestedSchema, testRecord)); + assertTrue(GenericData.get().validate(nestedSchema.toAvroSchema(), testRecord)); } private List<String> createHiveColumnsFrom(final String columnNamesStr) { @@ -198,7 +197,7 @@ public class TestHiveAvroSerializer { @Test public void testGetTopLevelFields() { - Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD); HiveAvroSerializer serializer = new HiveAvroSerializer(schema); ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ @@ -216,7 +215,7 @@ public class TestHiveAvroSerializer { @Test public void testGetNestedFields() { - Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD); HiveAvroSerializer serializer = new HiveAvroSerializer(schema); ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ @@ -234,7 +233,7 @@ public class TestHiveAvroSerializer { @Test public void testInvalidFieldNameThrows() { - Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD); HiveAvroSerializer serializer = new HiveAvroSerializer(schema); ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ @@ -257,7 +256,7 @@ public class TestHiveAvroSerializer { @Test public void testGetValueFromArrayOrMap() { - Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP); + HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP); HiveAvroSerializer serializer = new HiveAvroSerializer(schema); ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{ @@ -298,7 +297,7 @@ public class TestHiveAvroSerializer { @Test public void testGetJavaTopLevelFields() { - Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD); HiveAvroSerializer serializer = new HiveAvroSerializer(schema); ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ @@ -316,7 +315,7 @@ public class TestHiveAvroSerializer { @Test public void testGetJavaNestedFields() { - Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD); + HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD); HiveAvroSerializer serializer = new HiveAvroSerializer(schema); ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ @@ -334,7 +333,7 @@ public class TestHiveAvroSerializer { @Test public void testGetJavaArrayAndMap() { - Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP); + HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP); HiveAvroSerializer serializer = new HiveAvroSerializer(schema); ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{ @@ -381,7 +380,7 @@ public class TestHiveAvroSerializer { @Test public void testGetJavaInvalidFieldAccess() { - Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP); + HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP); HiveAvroSerializer serializer = new HiveAvroSerializer(schema); ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{ diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java index 1c5a9575ecd5..026a0cdbea61 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java @@ -80,14 +80,14 @@ public class TestHoodieArrayWritableSchemaUtils { //We reuse the ArrayWritable, so we need to get the values before projecting ArrayWritable record = convertArrayWritable(dataGen.generateGenericRecord()); - HiveAvroSerializer fromSerializer = new HiveAvroSerializer(from.toAvroSchema()); + HiveAvroSerializer fromSerializer = new HiveAvroSerializer(from); Object tripType = fromSerializer.getValue(record, "trip_type"); Object currentTs = fromSerializer.getValue(record, "current_ts"); Object weight = fromSerializer.getValue(record, "weight"); //Make sure the projected fields can be read ArrayWritable projectedRecord = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(record, from, to, Collections.emptyMap()); - HiveAvroSerializer toSerializer = new HiveAvroSerializer(to.toAvroSchema()); + HiveAvroSerializer toSerializer = new HiveAvroSerializer(to); assertEquals(tripType, toSerializer.getValue(projectedRecord, "trip_type")); assertEquals(currentTs, toSerializer.getValue(projectedRecord, "current_ts")); assertEquals(weight, toSerializer.getValue(projectedRecord, "weight")); @@ -320,8 +320,8 @@ public class TestHoodieArrayWritableSchemaUtils { Writable newWritable, HoodieSchema newSchema ) throws AvroSerdeException { - TypeInfo oldTypeInfo = HiveTypeUtils.generateTypeInfo(oldSchema.toAvroSchema(), Collections.emptySet()); - TypeInfo newTypeInfo = HiveTypeUtils.generateTypeInfo(newSchema.toAvroSchema(), Collections.emptySet()); + TypeInfo oldTypeInfo = HiveTypeUtils.generateTypeInfo(oldSchema, Collections.emptySet()); + TypeInfo newTypeInfo = HiveTypeUtils.generateTypeInfo(newSchema, Collections.emptySet()); ObjectInspector oldObjectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(oldTypeInfo); ObjectInspector newObjectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(newTypeInfo);
