This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal-p2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bea2dec72b3ffcb5d28686066b8bf0ea0181e31c
Author: voon <[email protected]>
AuthorDate: Fri Dec 12 18:30:24 2025 +0800

    Remove AvroSchemaUtils from HiveAvroSerializer and HiveTypeUtils
---
 .../hadoop/TestHoodieFileGroupReaderOnHive.java    |   2 +-
 .../org/apache/hudi/avro/AvroRecordContext.java    |  16 +-
 .../hudi/hadoop/HiveHoodieReaderContext.java       |   2 +-
 .../org/apache/hudi/hadoop/HiveRecordContext.java  |  11 +-
 .../org/apache/hudi/hadoop/HoodieHiveRecord.java   |  11 +-
 .../realtime/RealtimeCompactedRecordReader.java    |   4 +-
 .../hudi/hadoop/utils/HiveAvroSerializer.java      | 120 ++++++++-------
 .../apache/hudi/hadoop/utils/HiveTypeUtils.java    | 164 ++++++++++-----------
 .../apache/hudi/hadoop/TestHoodieHiveRecord.java   |   9 +-
 .../hudi/hadoop/utils/TestHiveAvroSerializer.java  |  53 ++++---
 .../utils/TestHoodieArrayWritableSchemaUtils.java  |   8 +-
 11 files changed, 205 insertions(+), 195 deletions(-)

diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index 05d2c898f868..47b4114dea06 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -121,7 +121,7 @@ public class TestHoodieFileGroupReaderOnHive extends 
HoodieFileGroupReaderOnJava
     List<HoodieSchemaField> fields = schema.getFields();
     setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION);
     try {
-      String columnTypes = 
HiveTypeUtils.generateColumnTypes(schema.toAvroSchema()).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
+      String columnTypes = 
HiveTypeUtils.generateColumnTypes(schema).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
       jobConf.set("columns.types", columnTypes + ",string");
     } catch (SerDeException e) {
       throw new RuntimeException(e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
index 9fe1ea3f0650..947a292196ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.AvroJavaTypeConverter;
@@ -32,7 +33,6 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -70,22 +70,20 @@ public class AvroRecordContext extends 
RecordContext<IndexedRecord> {
   public static Object getFieldValueFromIndexedRecord(
       IndexedRecord record,
       String fieldName) {
-    Schema currentSchema = record.getSchema();
+    HoodieSchema currentSchema = 
HoodieSchema.fromAvroSchema(record.getSchema());
     IndexedRecord currentRecord = record;
     String[] path = fieldName.split("\\.");
     for (int i = 0; i < path.length; i++) {
-      if (currentSchema.isUnion()) {
-        currentSchema = AvroSchemaUtils.getNonNullTypeFromUnion(currentSchema);
-      }
-      Schema.Field field = currentSchema.getField(path[i]);
-      if (field == null) {
+      currentSchema = currentSchema.getNonNullType();
+      Option<HoodieSchemaField> fieldOpt = currentSchema.getField(path[i]);
+      if (fieldOpt.isEmpty()) {
         return null;
       }
-      Object value = currentRecord.get(field.pos());
+      Object value = currentRecord.get(fieldOpt.get().pos());
       if (i == path.length - 1) {
         return value;
       }
-      currentSchema = field.schema();
+      currentSchema = fieldOpt.get().schema();
       currentRecord = (IndexedRecord) value;
     }
     return null;
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 4614854d4df3..38cd64c13f37 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -101,7 +101,7 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
     jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", 
dataColumnNameList));
     List<TypeInfo> columnTypes;
     try {
-      columnTypes = 
HiveTypeUtils.generateColumnTypes(dataSchema.toAvroSchema());
+      columnTypes = HiveTypeUtils.generateColumnTypes(dataSchema);
     } catch (AvroSerdeException e) {
       throw new HoodieAvroSchemaException(String.format("Failed to generate 
hive column types from schema: %s, due to %s", dataSchema, e));
     }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
index 8f52b290354b..bc12027921f6 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
@@ -31,7 +31,6 @@ import org.apache.hudi.hadoop.utils.HiveJavaTypeConverter;
 import org.apache.hudi.hadoop.utils.HoodieArrayWritableSchemaUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -56,9 +55,9 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
     return FIELD_ACCESSOR_INSTANCE;
   }
 
-  private final Map<Schema, HiveAvroSerializer> serializerCache = new 
ConcurrentHashMap<>();
+  private final Map<HoodieSchema, HiveAvroSerializer> serializerCache = new 
ConcurrentHashMap<>();
 
-  private HiveAvroSerializer getHiveAvroSerializer(Schema schema) {
+  private HiveAvroSerializer getHiveAvroSerializer(HoodieSchema schema) {
     return serializerCache.computeIfAbsent(schema, HiveAvroSerializer::new);
   }
 
@@ -72,7 +71,7 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
 
   @Override
   public Object getValue(ArrayWritable record, HoodieSchema schema, String 
fieldName) {
-    return getHiveAvroSerializer(schema.toAvroSchema()).getValue(record, 
fieldName);
+    return getHiveAvroSerializer(schema).getValue(record, fieldName);
   }
 
   @Override
@@ -92,7 +91,7 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
     }
     HoodieSchema schema = getSchemaFromBufferRecord(bufferedRecord);
     ArrayWritable writable = bufferedRecord.getRecord();
-    return new HoodieHiveRecord(key, writable, schema.toAvroSchema(), 
getHiveAvroSerializer(schema.toAvroSchema()),
+    return new HoodieHiveRecord(key, writable, schema, 
getHiveAvroSerializer(schema),
         bufferedRecord.getHoodieOperation(), 
bufferedRecord.getOrderingValue(), bufferedRecord.isDelete());
   }
 
@@ -143,7 +142,7 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
 
   @Override
   public GenericRecord convertToAvroRecord(ArrayWritable record, HoodieSchema 
schema) {
-    return getHiveAvroSerializer(schema.toAvroSchema()).serialize(record);
+    return getHiveAvroSerializer(schema).serialize(record);
   }
 
   @Override
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index 1ee19c32cecf..a3f5f857dc57 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.OrderingValues;
@@ -60,9 +61,9 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
 
   private final HiveAvroSerializer avroSerializer;
 
-  protected Schema schema;
+  protected HoodieSchema schema;
 
-  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HiveAvroSerializer avroSerializer) {
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema 
schema, HiveAvroSerializer avroSerializer) {
     super(key, data);
     this.avroSerializer = avroSerializer;
     this.schema = schema;
@@ -70,7 +71,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
     isDelete = data == null;
   }
 
-  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation, Comparable 
orderingValue, boolean isDelete) {
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema 
schema, HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation, 
Comparable orderingValue, boolean isDelete) {
     super(key, data, hoodieOperation, isDelete, Option.empty());
     this.orderingValue = orderingValue;
     this.avroSerializer = avroSerializer;
@@ -78,7 +79,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
     this.copy = false;
   }
 
-  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HoodieOperation operation, boolean isCopy,
+  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema 
schema, HoodieOperation operation, boolean isCopy,
                            HiveAvroSerializer avroSerializer) {
     super(key, data, operation, Option.empty());
     this.schema = schema;
@@ -246,7 +247,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
     return avroSerializer.getValue(data, name);
   }
 
-  protected Schema getSchema() {
+  protected HoodieSchema getSchema() {
     return schema;
   }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index fe0c91f6f3a1..d910145bd464 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -205,7 +205,7 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
     // for presto engine, the hiveSchema will be: col1,col2, but the 
writerSchema will be col1,col2,par
     // so to be compatible with hive and presto, we should rewrite oldRecord 
before we call combineAndGetUpdateValue,
     // once presto on hudi have its own mor reader, we can remove the rewrite 
logical.
-    GenericRecord genericRecord = 
HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, 
getLogScannerReaderSchema().toAvroSchema());
+    GenericRecord genericRecord = 
HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, 
getLogScannerReaderSchema());
     RecordContext<IndexedRecord> recordContext = 
AvroRecordContext.getFieldAccessorInstance();
     BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, 
HoodieSchema.fromAvroSchema(genericRecord.getSchema()), recordContext, 
orderingFields, newRecord.getRecordKey(), false);
     BufferedRecord newBufferedRecord = 
BufferedRecords.fromHoodieRecord(newRecord, 
HoodieSchema.fromAvroSchema(getLogScannerReaderSchema().toAvroSchema()),
@@ -218,7 +218,7 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
   }
 
   private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable 
arrayWritable) {
-    GenericRecord record = serializer.serialize(arrayWritable, 
getHiveSchema().toAvroSchema());
+    GenericRecord record = serializer.serialize(arrayWritable, 
getHiveSchema());
     return record;
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
index 16690ac360b5..96210d4b9956 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
@@ -18,14 +18,16 @@
 
 package org.apache.hudi.hadoop.utils;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieAvroSchemaException;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 
-import org.apache.avro.JsonProperties;
-import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericEnumSymbol;
@@ -62,6 +64,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -75,17 +78,16 @@ public class HiveAvroSerializer {
   private final List<String> columnNames;
   private final List<TypeInfo> columnTypes;
   private final ArrayWritableObjectInspector objectInspector;
-  private final Schema recordSchema;
+  private final HoodieSchema recordSchema;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveAvroSerializer.class);
 
-  public HiveAvroSerializer(Schema schema) {
-    schema = AvroSchemaUtils.getNonNullTypeFromUnion(schema);
-    if (schema.getType() != Schema.Type.RECORD) {
+  public HiveAvroSerializer(HoodieSchema schema) {
+    if (schema.getNonNullType().getType() != HoodieSchemaType.RECORD) {
       throw new IllegalArgumentException("Expected record schema, but got: " + 
schema);
     }
     this.recordSchema = schema;
-    this.columnNames = 
schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
+    this.columnNames = 
schema.getFields().stream().map(HoodieSchemaField::name).map(String::toLowerCase).collect(Collectors.toList());
     try {
       this.columnTypes = HiveTypeUtils.generateColumnTypes(schema);
     } catch (AvroSerdeException e) {
@@ -171,7 +173,7 @@ public class HiveAvroSerializer {
             + "', but got " + context.typeInfo.getTypeName());
       }
 
-      if (!(context.schema.getType() == Schema.Type.RECORD)) {
+      if (!(context.schema.getType() == HoodieSchemaType.RECORD)) {
         throw new HoodieException("Expected RecordSchema while resolving '" + 
path[i]
             + "', but got " + context.schema.getType());
       }
@@ -184,15 +186,13 @@ public class HiveAvroSerializer {
   }
 
   private FieldContext extractFieldFromRecord(ArrayWritable record, 
StructObjectInspector structObjectInspector,
-                                              List<TypeInfo> fieldTypes, 
Schema schema, String fieldName) {
-    Schema.Field schemaField = schema.getField(fieldName);
-    if (schemaField == null) {
-      throw new HoodieException("Field '" + fieldName + "' not found in 
schema: " + schema);
-    }
+                                              List<TypeInfo> fieldTypes, 
HoodieSchema schema, String fieldName) {
+    HoodieSchemaField schemaField = schema.getField(fieldName)
+        .orElseThrow(() -> new HoodieException("Field '" + fieldName + "' not 
found in schema: " + schema));
 
     int fieldIdx = schemaField.pos();
     TypeInfo fieldTypeInfo = fieldTypes.get(fieldIdx);
-    Schema fieldSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(schemaField.schema());
+    HoodieSchema fieldSchema = schemaField.schema().getNonNullType();
 
     StructField structField = 
structObjectInspector.getStructFieldRef(fieldName);
     if (structField == null) {
@@ -216,9 +216,9 @@ public class HiveAvroSerializer {
     final TypeInfo typeInfo;
     final ObjectInspector objectInspector;
     final Object object;
-    final Schema schema;
+    final HoodieSchema schema;
 
-    FieldContext(Object object, ObjectInspector objectInspector, TypeInfo 
typeInfo,  Schema schema) {
+    FieldContext(Object object, ObjectInspector objectInspector, TypeInfo 
typeInfo,  HoodieSchema schema) {
       this.object = object;
       this.objectInspector = objectInspector;
       this.typeInfo = typeInfo;
@@ -226,7 +226,7 @@ public class HiveAvroSerializer {
     }
   }
 
-  private static final Schema STRING_SCHEMA = 
Schema.create(Schema.Type.STRING);
+  private static final HoodieSchema STRING_SCHEMA = 
HoodieSchema.create(HoodieSchemaType.STRING);
 
   public GenericRecord serialize(Object o) {
     if (recordSchema == null) {
@@ -235,10 +235,10 @@ public class HiveAvroSerializer {
     return serialize(o, recordSchema);
   }
 
-  public GenericRecord serialize(Object o, Schema schema) {
+  public GenericRecord serialize(Object o, HoodieSchema schema) {
 
     StructObjectInspector soi = objectInspector;
-    GenericData.Record record = new GenericData.Record(schema);
+    GenericData.Record record = new GenericData.Record(schema.toAvroSchema());
 
     List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
     if (outputFieldRefs.size() != columnNames.size()) {
@@ -251,7 +251,7 @@ public class HiveAvroSerializer {
     List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
 
     for (int i = 0; i < size; i++) {
-      Schema.Field field = schema.getFields().get(i);
+      HoodieSchemaField field = schema.getFields().get(i);
       if (i >= columnTypes.size()) {
         break;
       }
@@ -268,28 +268,34 @@ public class HiveAvroSerializer {
     return record;
   }
 
-  private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object 
structFieldData, ObjectInspector fieldOI, GenericData.Record record, 
Schema.Field field) {
+  private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object 
structFieldData, ObjectInspector fieldOI, GenericData.Record record, 
HoodieSchemaField field) {
     Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
     if (val == null) {
-      if (field.defaultVal() instanceof JsonProperties.Null) {
+      Option<Object> defaultValOpt = field.defaultVal();
+      // In Avro/HoodieSchema, field.defaultVal() returns:
+      // - JsonProperties.Null / HoodieSchema.NULL_VALUE = if default is 
explicitly null
+      // - null / isEmpty() = if field has NO default value
+      // - some value = if field has an actual default
+      if (defaultValOpt.isPresent() && defaultValOpt.get() == 
HoodieSchema.NULL_VALUE) {
         record.put(field.name(), null);
       } else {
-        record.put(field.name(), field.defaultVal());
+        // is not present or has some value
+        record.put(field.name(), defaultValOpt.orElse(null));
       }
     } else {
       record.put(field.name(), val);
     }
   }
 
-  private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+  private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object 
structFieldData, HoodieSchema schema) throws HoodieException {
     if (null == structFieldData) {
       return null;
     }
 
-    schema = AvroSchemaUtils.getNonNullTypeFromUnion(schema);
+    schema = schema.getNonNullType();
 
     /* Because we use Hive's 'string' type when Avro calls for enum, we have 
to expressly check for enum-ness */
-    if (Schema.Type.ENUM.equals(schema.getType())) {
+    if (HoodieSchemaType.ENUM == schema.getType()) {
       assert fieldOI instanceof PrimitiveObjectInspector;
       return serializeEnum((PrimitiveObjectInspector) fieldOI, 
structFieldData, schema);
     }
@@ -339,48 +345,48 @@ public class HiveAvroSerializer {
     }
   };
 
-  private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+  private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object 
structFieldData, HoodieSchema schema) throws HoodieException {
     try {
-      return enums.retrieve(schema).retrieve(serializePrimitive(fieldOI, 
structFieldData, schema));
+      return 
enums.retrieve(schema.toAvroSchema()).retrieve(serializePrimitive(fieldOI, 
structFieldData, schema));
     } catch (Exception e) {
       throw new HoodieException(e);
     }
   }
 
-  private Object serializeStruct(StructTypeInfo typeInfo, 
StructObjectInspector ssoi, Object o, Schema schema) {
+  private Object serializeStruct(StructTypeInfo typeInfo, 
StructObjectInspector ssoi, Object o, HoodieSchema schema) {
     int size = schema.getFields().size();
     List<? extends StructField> allStructFieldRefs = 
ssoi.getAllStructFieldRefs();
     List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
-    GenericData.Record record = new GenericData.Record(schema);
+    GenericData.Record record = new GenericData.Record(schema.toAvroSchema());
     ArrayList<TypeInfo> allStructFieldTypeInfos = 
typeInfo.getAllStructFieldTypeInfos();
 
     for (int i = 0; i < size; i++) {
-      Schema.Field field = schema.getFields().get(i);
+      HoodieSchemaField field = schema.getFields().get(i);
       setUpRecordFieldFromWritable(allStructFieldTypeInfos.get(i), 
structFieldsDataAsList.get(i),
           allStructFieldRefs.get(i).getFieldObjectInspector(), record, field);
     }
     return record;
   }
 
-  private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+  private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object 
structFieldData, HoodieSchema schema) throws HoodieException {
     switch (fieldOI.getPrimitiveCategory()) {
       case BINARY:
-        if (schema.getType() == Schema.Type.BYTES) {
+        if (schema.getType() == HoodieSchemaType.BYTES) {
           return AvroSerdeUtils.getBufferFromBytes((byte[]) 
fieldOI.getPrimitiveJavaObject(structFieldData));
-        } else if (schema.getType() == Schema.Type.FIXED) {
-          GenericData.Fixed fixed = new GenericData.Fixed(schema, (byte[]) 
fieldOI.getPrimitiveJavaObject(structFieldData));
+        } else if (schema.getType() == HoodieSchemaType.FIXED) {
+          GenericData.Fixed fixed = new 
GenericData.Fixed(schema.toAvroSchema(), (byte[]) 
fieldOI.getPrimitiveJavaObject(structFieldData));
           return fixed;
         } else {
           throw new HoodieException("Unexpected Avro schema for Binary 
TypeInfo: " + schema.getType());
         }
       case DECIMAL:
         HiveDecimal dec = (HiveDecimal) 
fieldOI.getPrimitiveJavaObject(structFieldData);
-        LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
schema.getLogicalType();
+        HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) schema;
         BigDecimal bd = new 
BigDecimal(dec.toString()).setScale(decimal.getScale());
-        if (schema.getType() == Schema.Type.BYTES) {
-          return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, schema, 
decimal);
+        if (schema.getType() == HoodieSchemaType.BYTES) {
+          return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, 
schema.toAvroSchema(), decimal.toAvroSchema().getLogicalType());
         } else {
-          return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, 
decimal);
+          return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
schema.toAvroSchema(), decimal.toAvroSchema().getLogicalType());
         }
       case CHAR:
         HiveChar ch = (HiveChar) 
fieldOI.getPrimitiveJavaObject(structFieldData);
@@ -396,7 +402,7 @@ public class HiveAvroSerializer {
       case TIMESTAMP:
         return HoodieHiveUtils.getMills(structFieldData);
       case INT:
-        if (schema.getLogicalType() != null && 
schema.getLogicalType().getName().equals("date")) {
+        if (schema.getType() != null && schema.getType() == 
HoodieSchemaType.DATE) {
           return new 
WritableDateObjectInspector().getPrimitiveWritableObject(structFieldData).getDays();
         }
         return fieldOI.getPrimitiveJavaObject(structFieldData);
@@ -409,7 +415,7 @@ public class HiveAvroSerializer {
     }
   }
 
-  private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+  private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector 
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
     byte tag = fieldOI.getTag(structFieldData);
 
     // Invariant that Avro's tag ordering must match Hive's.
@@ -419,20 +425,20 @@ public class HiveAvroSerializer {
         schema.getTypes().get(tag));
   }
 
-  private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+  private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector 
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
     List<?> list = fieldOI.getList(structFieldData);
-    List<Object> deserialized = new GenericData.Array<Object>(list.size(), 
schema);
+    List<Object> deserialized = new GenericData.Array<>(list.size(), 
schema.toAvroSchema());
 
     TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo();
     ObjectInspector listElementObjectInspector = 
fieldOI.getListElementObjectInspector();
     // NOTE: We have to resolve nullable schema, since Avro permits array 
elements
     //       to be null
-    Schema arrayNestedType = 
AvroSchemaUtils.getNonNullTypeFromUnion(schema.getElementType());
-    Schema elementType;
+    HoodieSchema arrayNestedType = schema.getElementType().getNonNullType();
+    HoodieSchema elementType;
     if (listElementObjectInspector.getCategory() == 
ObjectInspector.Category.PRIMITIVE) {
       elementType = arrayNestedType;
     } else {
-      elementType = arrayNestedType.getField("element") == null ? 
arrayNestedType : arrayNestedType.getField("element").schema();
+      elementType = arrayNestedType.getField("element").isEmpty() ? 
arrayNestedType : arrayNestedType.getField("element").get().schema();
     }
     for (int i = 0; i < list.size(); i++) {
       Object childFieldData = list.get(i);
@@ -445,7 +451,7 @@ public class HiveAvroSerializer {
     return deserialized;
   }
 
-  private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+  private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector 
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
     // Avro only allows maps with string keys
     if (!mapHasStringKey(fieldOI.getMapKeyObjectInspector())) {
       throw new HoodieException("Avro only supports maps with keys as Strings. 
 Current Map is: " + typeInfo.toString());
@@ -456,7 +462,7 @@ public class HiveAvroSerializer {
     TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo();
     TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo();
     Map<?, ?> map = fieldOI.getMap(structFieldData);
-    Schema valueType = schema.getValueType();
+    HoodieSchema valueType = schema.getValueType();
 
     Map<Object, Object> deserialized = new LinkedHashMap<Object, 
Object>(fieldOI.getMapSize(structFieldData));
 
@@ -473,10 +479,10 @@ public class HiveAvroSerializer {
         && ((PrimitiveObjectInspector) 
mapKeyObjectInspector).getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.STRING);
   }
 
-  public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord 
oldRecord, Schema newSchema) {
-    GenericRecord newRecord = new GenericData.Record(newSchema);
+  public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord 
oldRecord, HoodieSchema newSchema) {
+    GenericRecord newRecord = new GenericData.Record(newSchema.toAvroSchema());
     boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
-    for (Schema.Field f : newSchema.getFields()) {
+    for (HoodieSchemaField f : newSchema.getFields()) {
       if (!(isSpecificRecord && isMetadataField(f.name()))) {
         copyOldValueOrSetDefault(oldRecord, newRecord, f);
       }
@@ -484,7 +490,7 @@ public class HiveAvroSerializer {
     return newRecord;
   }
 
-  private static void copyOldValueOrSetDefault(GenericRecord oldRecord, 
GenericRecord newRecord, Schema.Field field) {
+  private static void copyOldValueOrSetDefault(GenericRecord oldRecord, 
GenericRecord newRecord, HoodieSchemaField field) {
     Schema oldSchema = oldRecord.getSchema();
     Object fieldValue = oldSchema.getField(field.name()) == null ? null : 
oldRecord.get(field.name());
 
@@ -493,12 +499,16 @@ public class HiveAvroSerializer {
       Object newFieldValue;
       if (fieldValue instanceof GenericRecord) {
         GenericRecord record = (GenericRecord) fieldValue;
-        newFieldValue = rewriteRecordIgnoreResultCheck(record, 
AvroSchemaUtils.resolveUnionSchema(field.schema(), 
record.getSchema().getFullName()));
+        HoodieSchema nonNullFieldSchema = field.schema().getNonNullType();
+        if (!Objects.equals(nonNullFieldSchema.getFullName(), 
record.getSchema().getFullName())) {
+          throw new HoodieSchemaException(String.format("Unsupported UNION 
type %s: Only UNION of a null type and a non-null type is supported", 
field.schema()));
+        }
+        newFieldValue = rewriteRecordIgnoreResultCheck(record, 
nonNullFieldSchema);
       } else {
         newFieldValue = fieldValue;
       }
       newRecord.put(field.name(), newFieldValue);
-    } else if (field.defaultVal() instanceof JsonProperties.Null) {
+    } else if (field.defaultVal() == HoodieSchema.NULL_VALUE) {
       newRecord.put(field.name(), null);
     } else {
       newRecord.put(field.name(), field.defaultVal());
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
index a5383b63ebb6..bcea1d628de7 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
@@ -18,17 +18,17 @@
 
 package org.apache.hudi.hadoop.utils;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
-
-import static org.apache.avro.Schema.Type.BOOLEAN;
-import static org.apache.avro.Schema.Type.BYTES;
-import static org.apache.avro.Schema.Type.DOUBLE;
-import static org.apache.avro.Schema.Type.FIXED;
-import static org.apache.avro.Schema.Type.FLOAT;
-import static org.apache.avro.Schema.Type.INT;
-import static org.apache.avro.Schema.Type.LONG;
-import static org.apache.avro.Schema.Type.NULL;
-import static org.apache.avro.Schema.Type.STRING;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
+import org.apache.hadoop.hive.serde2.avro.InstanceCache;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,13 +38,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.avro.Schema;
-import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
-import org.apache.hadoop.hive.serde2.avro.InstanceCache;
-import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import static org.apache.hudi.common.schema.HoodieSchemaType.ARRAY;
+import static org.apache.hudi.common.schema.HoodieSchemaType.BOOLEAN;
+import static org.apache.hudi.common.schema.HoodieSchemaType.BYTES;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DATE;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DECIMAL;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DOUBLE;
+import static org.apache.hudi.common.schema.HoodieSchemaType.ENUM;
+import static org.apache.hudi.common.schema.HoodieSchemaType.FIXED;
+import static org.apache.hudi.common.schema.HoodieSchemaType.FLOAT;
+import static org.apache.hudi.common.schema.HoodieSchemaType.INT;
+import static org.apache.hudi.common.schema.HoodieSchemaType.LONG;
+import static org.apache.hudi.common.schema.HoodieSchemaType.MAP;
+import static org.apache.hudi.common.schema.HoodieSchemaType.NULL;
+import static org.apache.hudi.common.schema.HoodieSchemaType.RECORD;
+import static org.apache.hudi.common.schema.HoodieSchemaType.STRING;
+import static org.apache.hudi.common.schema.HoodieSchemaType.UNION;
 
 /**
  * Convert an Avro Schema to a Hive TypeInfo
@@ -66,9 +75,9 @@ public class HiveTypeUtils {
   //                  smallint
 
   // Map of Avro's primitive types to Hives (for those that are supported by 
both)
-  private static final Map<Schema.Type, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO 
= initTypeMap();
-  private static Map<Schema.Type, TypeInfo> initTypeMap() {
-    Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
+  private static final Map<HoodieSchemaType, TypeInfo> 
PRIMITIVE_TYPE_TO_TYPE_INFO = initTypeMap();
+  private static Map<HoodieSchemaType, TypeInfo> initTypeMap() {
+    Map<HoodieSchemaType, TypeInfo> theMap = new Hashtable<>();
     theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
     theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
     theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
@@ -90,7 +99,7 @@ public class HiveTypeUtils {
    *         from the schema.
    * @throws AvroSerdeException for problems during conversion.
    */
-  public static List<TypeInfo> generateColumnTypes(Schema schema) throws 
AvroSerdeException {
+  public static List<TypeInfo> generateColumnTypes(HoodieSchema schema) throws 
AvroSerdeException {
     return generateColumnTypes(schema, null);
   }
 
@@ -105,23 +114,23 @@ public class HiveTypeUtils {
    *         from the schema.
    * @throws AvroSerdeException for problems during conversion.
    */
-  public static List<TypeInfo> generateColumnTypes(Schema schema,
-                                                   Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    List<Schema.Field> fields = schema.getFields();
+  public static List<TypeInfo> generateColumnTypes(HoodieSchema schema,
+                                                   Set<HoodieSchema> 
seenSchemas) throws AvroSerdeException {
+    List<HoodieSchemaField> fields = schema.getFields();
 
     List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size());
 
-    for (Schema.Field field : fields) {
+    for (HoodieSchemaField field : fields) {
       types.add(generateTypeInfo(field.schema(), seenSchemas));
     }
 
     return types;
   }
 
-  static InstanceCache<Schema, TypeInfo> typeInfoCache = new 
InstanceCache<Schema, TypeInfo>() {
+  static InstanceCache<HoodieSchema, TypeInfo> typeInfoCache = new 
InstanceCache<HoodieSchema, TypeInfo>() {
     @Override
-    protected TypeInfo makeInstance(Schema s,
-                                    Set<Schema> seenSchemas)
+    protected TypeInfo makeInstance(HoodieSchema s,
+                                    Set<HoodieSchema> seenSchemas)
         throws AvroSerdeException {
       return generateTypeInfoWorker(s, seenSchemas);
     }
@@ -134,33 +143,25 @@ public class HiveTypeUtils {
    * @return TypeInfo matching the Avro schema
    * @throws AvroSerdeException for any problems during conversion.
    */
-  public static TypeInfo generateTypeInfo(Schema schema,
-                                          Set<Schema> seenSchemas) throws 
AvroSerdeException {
+  public static TypeInfo generateTypeInfo(HoodieSchema schema,
+                                          Set<HoodieSchema> seenSchemas) 
throws AvroSerdeException {
     // For bytes type, it can be mapped to decimal.
-    Schema.Type type = schema.getType();
-    // HUDI MODIFICATION ADDED "|| type == FIXED"
-    if ((type == BYTES || type == FIXED) && AvroSerDe.DECIMAL_TYPE_NAME
-        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
-      int precision = 0;
-      int scale = 0;
-      try {
-        precision = 
getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_PRECISION));
-        scale = getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_SCALE));
-      } catch (Exception ex) {
-        throw new AvroSerdeException("Failed to obtain scale value from file 
schema: " + schema, ex);
-      }
-
+    HoodieSchemaType type = schema.getType();
+    if (type == DECIMAL && AvroSerDe.DECIMAL_TYPE_NAME
+        .equalsIgnoreCase((String) 
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+      HoodieSchema.Decimal decimalSchema = (HoodieSchema.Decimal) schema;
+      int precision = decimalSchema.getPrecision();
+      int scale = decimalSchema.getScale();
       try {
         HiveDecimalUtils.validateParameter(precision, scale);
       } catch (Exception ex) {
         throw new AvroSerdeException("Invalid precision or scale for decimal 
type", ex);
       }
-
       return TypeInfoFactory.getDecimalTypeInfo(precision, scale);
     }
 
     if (type == STRING
-        && 
AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
+        && AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase((String) 
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
       int maxLength = 0;
       try {
         maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
@@ -171,7 +172,7 @@ public class HiveTypeUtils {
     }
 
     if (type == STRING && AvroSerDe.VARCHAR_TYPE_NAME
-        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+        .equalsIgnoreCase((String) 
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
       int maxLength = 0;
       try {
         maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
@@ -181,13 +182,13 @@ public class HiveTypeUtils {
       return TypeInfoFactory.getVarcharTypeInfo(maxLength);
     }
 
-    if (type == INT
-        && 
AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
+    if (type == DATE
+        && AvroSerDe.DATE_TYPE_NAME.equals((String) 
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
       return TypeInfoFactory.dateTypeInfo;
     }
 
     if (type == LONG
-        && 
AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
+        && AvroSerDe.TIMESTAMP_TYPE_NAME.equals((String) 
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
       return TypeInfoFactory.timestampTypeInfo;
     }
 
@@ -224,8 +225,8 @@ public class HiveTypeUtils {
   }
 
   // added this from AvroSerdeUtils in hive latest
-  public static int getIntFromSchema(Schema schema, String name) {
-    Object obj = schema.getObjectProp(name);
+  public static int getIntFromSchema(HoodieSchema schema, String name) {
+    Object obj = schema.getProp(name);
     if (obj instanceof String) {
       return Integer.parseInt((String) obj);
     } else if (obj instanceof Integer) {
@@ -236,15 +237,15 @@ public class HiveTypeUtils {
     }
   }
 
-  private static TypeInfo generateTypeInfoWorker(Schema schema,
-                                                 Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    // Avro requires NULLable types to be defined as unions of some type T
+  private static TypeInfo generateTypeInfoWorker(HoodieSchema schema,
+                                                 Set<HoodieSchema> 
seenSchemas) throws AvroSerdeException {
+    // HoodieSchema requires NULLable types to be defined as unions of some 
type T
     // and NULL.  This is annoying and we're going to hide it from the user.
-    if (AvroSchemaUtils.isNullable(schema)) {
-      return generateTypeInfo(AvroSchemaUtils.getNonNullTypeFromUnion(schema), 
seenSchemas);
+    if (schema.isNullable()) {
+      return generateTypeInfo(schema.getNonNullType(), seenSchemas);
     }
 
-    Schema.Type type = schema.getType();
+    HoodieSchemaType type = schema.getType();
     if (PRIMITIVE_TYPE_TO_TYPE_INFO.containsKey(type)) {
       return PRIMITIVE_TYPE_TO_TYPE_INFO.get(type);
     }
@@ -259,12 +260,12 @@ public class HiveTypeUtils {
     }
   }
 
-  private static TypeInfo generateRecordTypeInfo(Schema schema,
-                                                 Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    assert schema.getType().equals(Schema.Type.RECORD);
+  private static TypeInfo generateRecordTypeInfo(HoodieSchema schema,
+                                                 Set<HoodieSchema> 
seenSchemas) throws AvroSerdeException {
+    ValidationUtils.checkArgument(schema.getType() == RECORD, schema + " is 
not a RECORD");
 
     if (seenSchemas == null) {
-      seenSchemas = Collections.newSetFromMap(new IdentityHashMap<Schema, 
Boolean>());
+      seenSchemas = Collections.newSetFromMap(new IdentityHashMap<>());
     } else if (seenSchemas.contains(schema)) {
       throw new AvroSerdeException(
           "Recursive schemas are not supported. Recursive schema was " + schema
@@ -272,9 +273,9 @@ public class HiveTypeUtils {
     }
     seenSchemas.add(schema);
 
-    List<Schema.Field> fields = schema.getFields();
-    List<String> fieldNames = new ArrayList<String>(fields.size());
-    List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size());
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<String> fieldNames = new ArrayList<>(fields.size());
+    List<TypeInfo> typeInfos = new ArrayList<>(fields.size());
 
     for (int i = 0; i < fields.size(); i++) {
       fieldNames.add(i, fields.get(i).name());
@@ -288,33 +289,32 @@ public class HiveTypeUtils {
    * Generate a TypeInfo for an Avro Map.  This is made slightly simpler in 
that
    * Avro only allows maps with strings for keys.
    */
-  private static TypeInfo generateMapTypeInfo(Schema schema,
-                                              Set<Schema> seenSchemas) throws 
AvroSerdeException {
-    assert schema.getType().equals(Schema.Type.MAP);
-    Schema valueType = schema.getValueType();
+  private static TypeInfo generateMapTypeInfo(HoodieSchema schema,
+                                              Set<HoodieSchema> seenSchemas) 
throws AvroSerdeException {
+    ValidationUtils.checkArgument(schema.getType() == MAP, schema + " is not 
MAP");
+    HoodieSchema valueType = schema.getValueType();
     TypeInfo ti = generateTypeInfo(valueType, seenSchemas);
 
     return 
TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), 
ti);
   }
 
-  private static TypeInfo generateArrayTypeInfo(Schema schema,
-                                                Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    assert schema.getType().equals(Schema.Type.ARRAY);
-    Schema itemsType = schema.getElementType();
+  private static TypeInfo generateArrayTypeInfo(HoodieSchema schema,
+                                                Set<HoodieSchema> seenSchemas) 
throws AvroSerdeException {
+    ValidationUtils.checkArgument(schema.getType() == ARRAY, schema + " is not 
an ARRAY");
+    HoodieSchema itemsType = schema.getElementType();
     TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas);
 
     return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
   }
 
-  private static TypeInfo generateUnionTypeInfo(Schema schema,
-                                                Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    assert schema.getType().equals(Schema.Type.UNION);
-    List<Schema> types = schema.getTypes();
-
+  private static TypeInfo generateUnionTypeInfo(HoodieSchema schema,
+                                                Set<HoodieSchema> seenSchemas) 
throws AvroSerdeException {
+    ValidationUtils.checkArgument(schema.getType() == UNION, schema + "is not 
a UNION");
+    List<HoodieSchema> types = schema.getTypes();
 
-    List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size());
+    List<TypeInfo> typeInfos = new ArrayList<>(types.size());
 
-    for (Schema type : types) {
+    for (HoodieSchema type : types) {
       typeInfos.add(generateTypeInfo(type, seenSchemas));
     }
 
@@ -324,8 +324,8 @@ public class HiveTypeUtils {
   // Hive doesn't have an Enum type, so we're going to treat them as Strings.
   // During the deserialize/serialize stage we'll check for enumness and
   // convert as such.
-  private static TypeInfo generateEnumTypeInfo(Schema schema) {
-    assert schema.getType().equals(Schema.Type.ENUM);
+  private static TypeInfo generateEnumTypeInfo(HoodieSchema schema) {
+    ValidationUtils.checkArgument(schema.getType() == ENUM, schema + " is not 
an ENUM");
 
     return TypeInfoFactory.getPrimitiveTypeInfo("string");
   }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
index d3c8e186551e..7a6337778407 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -50,9 +53,9 @@ class TestHoodieHiveRecord {
     // Create a minimal HoodieHiveRecord instance with mocked dependencies
     HoodieKey key = new HoodieKey("test-key", "test-partition");
     ArrayWritable data = new ArrayWritable(Writable.class, new Writable[]{new 
Text("test")});
-    Schema schema = Schema.createRecord("TestRecord", null, null, false);
-    schema.setFields(Collections.singletonList(new Schema.Field("testField", 
Schema.create(Schema.Type.STRING), null, null)));
-    
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
false,
+        Collections.singletonList(HoodieSchemaField.of("testField", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null)));
+
     // Create HoodieHiveRecord with mocked dependencies
     hoodieHiveRecord = new HoodieHiveRecord(key, data, schema, new 
HiveAvroSerializer(schema));
   }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
index d22093c24223..16a237fe1f0e 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
@@ -19,10 +19,9 @@
 package org.apache.hudi.hadoop.utils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -84,24 +83,24 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testSerialize() {
-    Schema avroSchema = new Schema.Parser().parse(SIMPLE_SCHEMA);
-    // create a test record with avroSchema
-    GenericData.Record avroRecord = new GenericData.Record(avroSchema);
+    HoodieSchema schema = HoodieSchema.parse(SIMPLE_SCHEMA);
+    // create a test record with schema
+    GenericData.Record avroRecord = new 
GenericData.Record(schema.toAvroSchema());
     avroRecord.put("id", 1);
     avroRecord.put("col1", 1000L);
     avroRecord.put("col2", -5.001f);
     avroRecord.put("col3", 12.999d);
-    Schema currentDecimalType = 
avroSchema.getField("col4").schema().getTypes().get(1);
-    BigDecimal bd = new BigDecimal("123.456").setScale(((LogicalTypes.Decimal) 
currentDecimalType.getLogicalType()).getScale());
-    avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
currentDecimalType, currentDecimalType.getLogicalType()));
+    HoodieSchema.Decimal currentDecimalType = (HoodieSchema.Decimal) 
schema.getField("col4").get().schema().getTypes().get(1);
+    BigDecimal bd = new 
BigDecimal("123.456").setScale(currentDecimalType.getScale());
+    avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
currentDecimalType.toAvroSchema(), 
currentDecimalType.toAvroSchema().getLogicalType()));
     avroRecord.put("col5", "2011-01-01");
     avroRecord.put("col6", 18987);
     avroRecord.put("col7", 1640491505111222L);
     avroRecord.put("col8", false);
     ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53});
     avroRecord.put("col9", bb);
-    assertTrue(GenericData.get().validate(avroSchema, avroRecord));
-    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema, 
true);
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), avroRecord));
+    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
schema.toAvroSchema(), true);
 
     List<Writable> writableList = 
Arrays.stream(writable.get()).collect(Collectors.toList());
     writableList.remove(writableList.size() - 1);
@@ -110,20 +109,20 @@ public class TestHiveAvroSerializer {
     List<TypeInfo> columnTypeList = 
createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary,date");
     List<String> columnNameList = 
createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9,par");
     StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
-    GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, avroSchema);
-    assertTrue(GenericData.get().validate(avroSchema, testRecord));
+    GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, schema);
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecord));
     // test
     List<TypeInfo> columnTypeListClip = 
createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary");
     List<String> columnNameListClip = 
createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9");
     StructTypeInfo rowTypeInfoClip = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameListClip, columnTypeListClip);
-    GenericRecord testRecordClip = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, 
columnTypeListClip).serialize(clipWritable, avroSchema);
-    assertTrue(GenericData.get().validate(avroSchema, testRecordClip));
+    GenericRecord testRecordClip = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, 
columnTypeListClip).serialize(clipWritable, schema);
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), 
testRecordClip));
   }
 
   @Test
   public void testNestedValueSerialize() {
-    Schema nestedSchema = new Schema.Parser().parse(NESTED_SCHEMA);
-    GenericRecord avroRecord = new GenericData.Record(nestedSchema);
+    HoodieSchema nestedSchema = HoodieSchema.parse(NESTED_SCHEMA);
+    GenericRecord avroRecord = new 
GenericData.Record(nestedSchema.toAvroSchema());
     avroRecord.put("firstname", "person1");
     avroRecord.put("lastname", "person2");
     GenericArray scores = new 
GenericData.Array<>(avroRecord.getSchema().getField("scores").schema(), 
Arrays.asList(1,2));
@@ -136,14 +135,14 @@ public class TestHiveAvroSerializer {
     GenericArray teachers = new 
GenericData.Array<>(avroRecord.getSchema().getField("teachers").schema(), 
Arrays.asList(studentRecord));
     avroRecord.put("teachers", teachers);
 
-    assertTrue(GenericData.get().validate(nestedSchema, avroRecord));
-    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, nestedSchema, 
true);
+    assertTrue(GenericData.get().validate(nestedSchema.toAvroSchema(), 
avroRecord));
+    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
nestedSchema.toAvroSchema(), true);
 
     List<TypeInfo> columnTypeList = 
createHiveTypeInfoFrom("string,string,array<int>,struct<firstname:string,lastname:string>,array<struct<firstname:string,lastname:string>>");
     List<String> columnNameList = 
createHiveColumnsFrom("firstname,lastname,arrayRecord,student,teachers");
     StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
     GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, nestedSchema);
-    assertTrue(GenericData.get().validate(nestedSchema, testRecord));
+    assertTrue(GenericData.get().validate(nestedSchema.toAvroSchema(), 
testRecord));
   }
 
   private List<String> createHiveColumnsFrom(final String columnNamesStr) {
@@ -198,7 +197,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetTopLevelFields() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -216,7 +215,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetNestedFields() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -234,7 +233,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testInvalidFieldNameThrows() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -257,7 +256,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetValueFromArrayOrMap() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
@@ -298,7 +297,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetJavaTopLevelFields() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -316,7 +315,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetJavaNestedFields() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -334,7 +333,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetJavaArrayAndMap() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
@@ -381,7 +380,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetJavaInvalidFieldAccess() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
index 1c5a9575ecd5..026a0cdbea61 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
@@ -80,14 +80,14 @@ public class TestHoodieArrayWritableSchemaUtils {
 
     //We reuse the ArrayWritable, so we need to get the values before 
projecting
     ArrayWritable record = 
convertArrayWritable(dataGen.generateGenericRecord());
-    HiveAvroSerializer fromSerializer = new 
HiveAvroSerializer(from.toAvroSchema());
+    HiveAvroSerializer fromSerializer = new HiveAvroSerializer(from);
     Object tripType = fromSerializer.getValue(record, "trip_type");
     Object currentTs = fromSerializer.getValue(record, "current_ts");
     Object weight = fromSerializer.getValue(record, "weight");
 
     //Make sure the projected fields can be read
     ArrayWritable projectedRecord = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(record, from, to, 
Collections.emptyMap());
-    HiveAvroSerializer toSerializer = new 
HiveAvroSerializer(to.toAvroSchema());
+    HiveAvroSerializer toSerializer = new HiveAvroSerializer(to);
     assertEquals(tripType, toSerializer.getValue(projectedRecord, 
"trip_type"));
     assertEquals(currentTs, toSerializer.getValue(projectedRecord, 
"current_ts"));
     assertEquals(weight, toSerializer.getValue(projectedRecord, "weight"));
@@ -320,8 +320,8 @@ public class TestHoodieArrayWritableSchemaUtils {
       Writable newWritable,
       HoodieSchema newSchema
   ) throws AvroSerdeException {
-    TypeInfo oldTypeInfo = 
HiveTypeUtils.generateTypeInfo(oldSchema.toAvroSchema(), 
Collections.emptySet());
-    TypeInfo newTypeInfo = 
HiveTypeUtils.generateTypeInfo(newSchema.toAvroSchema(), 
Collections.emptySet());
+    TypeInfo oldTypeInfo = HiveTypeUtils.generateTypeInfo(oldSchema, 
Collections.emptySet());
+    TypeInfo newTypeInfo = HiveTypeUtils.generateTypeInfo(newSchema, 
Collections.emptySet());
 
     ObjectInspector oldObjectInspector = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(oldTypeInfo);
     ObjectInspector newObjectInspector = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(newTypeInfo);

Reply via email to