This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 91c15b10d5d4 feat(schema): phase 17 - Remove AvroSchemaUtils usage 
(part 2) (#17581)
91c15b10d5d4 is described below

commit 91c15b10d5d468b4756385a4d4db678b286373ba
Author: voonhous <[email protected]>
AuthorDate: Thu Dec 25 02:18:09 2025 +0800

    feat(schema): phase 17 - Remove AvroSchemaUtils usage (part 2) (#17581)
    
    * Remove AvroSchemaUtils from HiveAvroSerializer and HiveTypeUtils
    
    * Address comments
    
    * Address comments
    
    * Remove AvroSchemaUtils#resolveUnionSchema
    
    * Add more test to cover generateColumnTypes
    
    * Fix HiveTypeUtils#generateTypeInfo behaviour and ensure that it is not 
changed
    
    * Address comments
    
    * Increase Azure CI timeout
---
 azure-pipelines-20230430.yml                       |   4 +-
 .../hadoop/TestHoodieFileGroupReaderOnHive.java    |   2 +-
 .../org/apache/hudi/avro/AvroRecordContext.java    |  16 +-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |  33 ---
 .../hudi/common/schema/HoodieSchemaUtils.java      |  42 ++++
 .../hudi/common/schema/TestHoodieSchemaUtils.java  | 171 ++++++++++++++
 .../hudi/hadoop/HiveHoodieReaderContext.java       |   2 +-
 .../org/apache/hudi/hadoop/HiveRecordContext.java  |  11 +-
 .../org/apache/hudi/hadoop/HoodieHiveRecord.java   |  11 +-
 .../realtime/RealtimeCompactedRecordReader.java    |   4 +-
 .../hudi/hadoop/utils/HiveAvroSerializer.java      | 126 +++++-----
 .../apache/hudi/hadoop/utils/HiveTypeUtils.java    | 189 ++++++++-------
 .../apache/hudi/hadoop/TestHoodieHiveRecord.java   |   9 +-
 .../hudi/hadoop/utils/TestHiveAvroSerializer.java  | 253 ++++++++++++++++++---
 .../utils/TestHoodieArrayWritableSchemaUtils.java  |   8 +-
 15 files changed, 642 insertions(+), 239 deletions(-)

diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 0afe13c64bb6..58fcbc22677d 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -180,7 +180,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_2
         displayName: FTA hudi-spark
-        timeoutInMinutes: '90'
+        timeoutInMinutes: '180'
         steps:
           - task: Maven@4
             displayName: maven install
@@ -492,7 +492,7 @@ stages:
             displayName: Top 100 long-running testcases
       - job: UT_FT_10
         displayName: UT FT common & other modules
-        timeoutInMinutes: '90'
+        timeoutInMinutes: '180'
         steps:
           - task: Docker@2
             displayName: "login to docker hub"
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index 05d2c898f868..47b4114dea06 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -121,7 +121,7 @@ public class TestHoodieFileGroupReaderOnHive extends 
HoodieFileGroupReaderOnJava
     List<HoodieSchemaField> fields = schema.getFields();
     setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION);
     try {
-      String columnTypes = 
HiveTypeUtils.generateColumnTypes(schema.toAvroSchema()).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
+      String columnTypes = 
HiveTypeUtils.generateColumnTypes(schema).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
       jobConf.set("columns.types", columnTypes + ",string");
     } catch (SerDeException e) {
       throw new RuntimeException(e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
index 9fe1ea3f0650..947a292196ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.AvroJavaTypeConverter;
@@ -32,7 +33,6 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -70,22 +70,20 @@ public class AvroRecordContext extends 
RecordContext<IndexedRecord> {
   public static Object getFieldValueFromIndexedRecord(
       IndexedRecord record,
       String fieldName) {
-    Schema currentSchema = record.getSchema();
+    HoodieSchema currentSchema = 
HoodieSchema.fromAvroSchema(record.getSchema());
     IndexedRecord currentRecord = record;
     String[] path = fieldName.split("\\.");
     for (int i = 0; i < path.length; i++) {
-      if (currentSchema.isUnion()) {
-        currentSchema = AvroSchemaUtils.getNonNullTypeFromUnion(currentSchema);
-      }
-      Schema.Field field = currentSchema.getField(path[i]);
-      if (field == null) {
+      currentSchema = currentSchema.getNonNullType();
+      Option<HoodieSchemaField> fieldOpt = currentSchema.getField(path[i]);
+      if (fieldOpt.isEmpty()) {
         return null;
       }
-      Object value = currentRecord.get(field.pos());
+      Object value = currentRecord.get(fieldOpt.get().pos());
       if (i == path.length - 1) {
         return value;
       }
-      currentSchema = field.schema();
+      currentSchema = fieldOpt.get().schema();
       currentRecord = (IndexedRecord) value;
     }
     return null;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index d9dd9869c69b..26fbd96b53df 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -477,39 +477,6 @@ public class AvroSchemaUtils {
     }
   }
 
-  /**
-   * Passed in {@code Union} schema and will try to resolve the field with the 
{@code fieldSchemaFullName}
-   * w/in the union returning its corresponding schema
-   *
-   * @param schema target schema to be inspected
-   * @param fieldSchemaFullName target field-name to be looked up w/in the 
union
-   * @return schema of the field w/in the union identified by the {@code 
fieldSchemaFullName}
-   */
-  public static Schema resolveUnionSchema(Schema schema, String 
fieldSchemaFullName) {
-    if (schema.getType() != Schema.Type.UNION) {
-      return schema;
-    }
-
-    List<Schema> innerTypes = schema.getTypes();
-    if (innerTypes.size() == 2 && isNullable(schema)) {
-      // this is a basic nullable field so handle it more efficiently
-      return getNonNullTypeFromUnion(schema);
-    }
-
-    Schema nonNullType =
-        innerTypes.stream()
-            .filter(it -> it.getType() != Schema.Type.NULL && 
Objects.equals(it.getFullName(), fieldSchemaFullName))
-            .findFirst()
-            .orElse(null);
-
-    if (nonNullType == null) {
-      throw new HoodieAvroSchemaException(
-          String.format("Unsupported Avro UNION type %s: Only UNION of a null 
type and a non-null type is supported", schema));
-    }
-
-    return nonNullType;
-  }
-
   /**
    * Returns true in case provided {@link Schema} is nullable (ie accepting 
null values),
    * returns false otherwise
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index cd0be22fd52e..bb54628e31af 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
@@ -35,6 +36,7 @@ import java.math.RoundingMode;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -724,4 +726,44 @@ public final class HoodieSchemaUtils {
   private static boolean isSmallPrecisionDecimalField(HoodieSchema.Decimal 
decimal) {
     return decimal.getPrecision() <= 18;
   }
+
+  /**
+   * Resolves a union schema by finding the schema matching the given full 
name.
+   * Handles both simple nullable unions (null + non-null) and complex unions 
with multiple types.
+   *
+   * <p>This method supports the following union types:
+   * <ul>
+   *   <li>Simple nullable unions: {@code ["null", "Type"]} - returns the 
non-null type</li>
+   *   <li>Complex unions: {@code ["null", "TypeA", "TypeB"]} - returns the 
type matching fieldSchemaFullName</li>
+   *   <li>Non-union schemas - returns the schema as-is</li>
+   * </ul>
+   *
+   * @param schema the schema to resolve (may or may not be a union)
+   * @param fieldSchemaFullName the full name of the schema to find within the 
union
+   * @return the resolved schema
+   * @throws HoodieSchemaException if the union cannot be resolved or no 
matching type is found
+   */
+  public static HoodieSchema resolveUnionSchema(HoodieSchema schema, String 
fieldSchemaFullName) {
+    if (schema.getType() != HoodieSchemaType.UNION) {
+      return schema;
+    }
+
+    List<HoodieSchema> innerTypes = schema.getTypes();
+    if (innerTypes.size() == 2 && schema.isNullable()) {
+      // this is a basic nullable field so handle it more efficiently
+      return schema.getNonNullType();
+    }
+
+    HoodieSchema nonNullType = innerTypes.stream()
+        .filter(it -> it.getType() != HoodieSchemaType.NULL && 
Objects.equals(it.getFullName(), fieldSchemaFullName))
+        .findFirst()
+        .orElse(null);
+
+    if (nonNullType == null) {
+      throw new HoodieSchemaException(
+          String.format("Unsupported UNION type %s: Only UNION of a null type 
and a non-null type is supported", schema));
+    }
+
+    return nonNullType;
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
index e29ee000f766..b5f649851fad 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
@@ -1588,4 +1589,174 @@ public class TestHoodieSchemaUtils {
     
assertFalse(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(SCHEMA_WITH_AVRO_TYPES_STR)));
     
assertFalse(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(EXAMPLE_SCHEMA)));
   }
+
+  @Test
+  void testResolveUnionSchemaWithNonUnionSchema() {
+    // Non-union schemas should be returned as-is
+    HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
+    HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(stringSchema, 
"any");
+
+    assertSame(stringSchema, result);
+  }
+
+  @Test
+  void testResolveUnionSchemaWithSimpleNullableUnion() {
+    // Simple nullable union: ["null", "string"] should return the non-null 
type efficiently
+    HoodieSchema nullableString = 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+    HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullableString, 
"string");
+
+    assertEquals(HoodieSchemaType.STRING, result.getType());
+  }
+
+  @Test
+  void testResolveUnionSchemaWithSimpleNullableRecord() {
+    // Test with nullable record type
+    HoodieSchema personSchema = HoodieSchema.createRecord(
+        "Person",
+        null,
+        null,
+        Collections.singletonList(
+            HoodieSchemaField.of("name", 
HoodieSchema.create(HoodieSchemaType.STRING))
+        )
+    );
+
+    HoodieSchema nullablePerson = HoodieSchema.createNullable(personSchema);
+    HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullablePerson, 
"Person");
+
+    assertEquals(HoodieSchemaType.RECORD, result.getType());
+    assertEquals("Person", result.getName());
+    assertFalse(result.isNullable());
+  }
+
+  @Test
+  void testResolveUnionSchemaWithComplexUnionMatchingFullName() {
+    // Complex union with 3+ types, matching by fullName
+    String unionSchemaJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"Container\","
+        + "\"fields\":[{"
+        + "  \"name\":\"data\","
+        + "  \"type\":[\"null\","
+        + "    
{\"type\":\"record\",\"name\":\"PersonRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+        + "    
{\"type\":\"record\",\"name\":\"CompanyRecord\",\"fields\":[{\"name\":\"companyName\",\"type\":\"string\"}]}"
+        + "  ]"
+        + "}]}";
+
+    HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+    HoodieSchema dataFieldSchema = 
containerSchema.getField("data").get().schema();
+
+    // Resolve to PersonRecord
+    HoodieSchema personResult = 
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "PersonRecord");
+    assertEquals(HoodieSchemaType.RECORD, personResult.getType());
+    assertEquals("PersonRecord", personResult.getName());
+    assertFalse(personResult.isNullable());
+    assertTrue(personResult.getField("name").isPresent());
+
+    // Resolve to CompanyRecord
+    HoodieSchema companyResult = 
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "CompanyRecord");
+    assertEquals(HoodieSchemaType.RECORD, companyResult.getType());
+    assertEquals("CompanyRecord", companyResult.getName());
+    assertFalse(companyResult.isNullable());
+    assertTrue(companyResult.getField("companyName").isPresent());
+  }
+
+  @Test
+  void testResolveUnionSchemaWithNonNullableTwoTypeUnion() {
+    // Union of two non-nullable types should use the complex resolution path
+    String unionSchemaJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"Container\","
+        + "\"fields\":[{"
+        + "  \"name\":\"data\","
+        + "  \"type\":["
+        + "    
{\"type\":\"record\",\"name\":\"TypeA\",\"fields\":[{\"name\":\"fieldA\",\"type\":\"string\"}]},"
+        + "    
{\"type\":\"record\",\"name\":\"TypeB\",\"fields\":[{\"name\":\"fieldB\",\"type\":\"int\"}]}"
+        + "  ]"
+        + "}]}";
+
+    HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+    HoodieSchema dataFieldSchema = 
containerSchema.getField("data").get().schema();
+
+    // Resolve to TypeA
+    HoodieSchema typeAResult = 
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "TypeA");
+    assertEquals(HoodieSchemaType.RECORD, typeAResult.getType());
+    assertEquals("TypeA", typeAResult.getName());
+    assertFalse(typeAResult.isNullable());
+
+    // Resolve to TypeB
+    HoodieSchema typeBResult = 
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, "TypeB");
+    assertEquals(HoodieSchemaType.RECORD, typeBResult.getType());
+    assertEquals("TypeB", typeBResult.getName());
+    assertFalse(typeAResult.isNullable());
+  }
+
+  @Test
+  void testResolveUnionSchemaThrowsExceptionWhenNoMatch() {
+    // Complex union where the requested fullName doesn't match any type
+    String unionSchemaJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"Container\","
+        + "\"fields\":[{"
+        + "  \"name\":\"data\","
+        + "  \"type\":[\"null\","
+        + "    
{\"type\":\"record\",\"name\":\"PersonRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+        + "    
{\"type\":\"record\",\"name\":\"CompanyRecord\",\"fields\":[{\"name\":\"companyName\",\"type\":\"string\"}]}"
+        + "  ]"
+        + "}]}";
+
+    HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+    HoodieSchema dataFieldSchema = 
containerSchema.getField("data").get().schema();
+
+    // Try to resolve to a type that doesn't exist in the union
+    HoodieSchemaException exception = assertThrows(
+        HoodieSchemaException.class,
+        () -> HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, 
"AnimalRecord")
+    );
+
+    assertTrue(exception.getMessage().contains("Unsupported UNION type"));
+    assertTrue(exception.getMessage().contains("Only UNION of a null type and 
a non-null type is supported"));
+  }
+
+  @Test
+  void testResolveUnionSchemaWithNamespacedRecords() {
+    // Test with fully qualified names (with namespace)
+    String unionSchemaJson = "{"
+        + "\"type\":\"record\","
+        + "\"name\":\"Container\","
+        + "\"namespace\":\"com.example\","
+        + "\"fields\":[{"
+        + "  \"name\":\"data\","
+        + "  \"type\":[\"null\","
+        + "    
{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"com.example.model\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]},"
+        + "    
{\"type\":\"record\",\"name\":\"Company\",\"namespace\":\"com.example.model\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"
+        + "  ]"
+        + "}]}";
+
+    HoodieSchema containerSchema = HoodieSchema.parse(unionSchemaJson);
+    HoodieSchema dataFieldSchema = 
containerSchema.getField("data").get().schema();
+
+    // Resolve using fully qualified name
+    HoodieSchema personResult = 
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, 
"com.example.model.Person");
+    assertEquals(HoodieSchemaType.RECORD, personResult.getType());
+    assertEquals("Person", personResult.getName());
+    assertFalse(personResult.isNullable());
+    assertEquals("com.example.model", personResult.getNamespace().get());
+
+    // Resolve Company
+    HoodieSchema companyResult = 
HoodieSchemaUtils.resolveUnionSchema(dataFieldSchema, 
"com.example.model.Company");
+    assertEquals(HoodieSchemaType.RECORD, companyResult.getType());
+    assertEquals("Company", companyResult.getName());
+    assertFalse(companyResult.isNullable());
+  }
+
+  @Test
+  void testResolveUnionSchemaWithPrimitiveTypes() {
+    // Test union containing primitive types (although less common)
+    // Union of null and string, but passed through the full name matching path
+    HoodieSchema nullableString = 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING));
+
+    // For simple 2-element nullable union, should use fast path
+    HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullableString, 
"string");
+    assertEquals(HoodieSchemaType.STRING, result.getType());
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 4614854d4df3..38cd64c13f37 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -101,7 +101,7 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
     jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", 
dataColumnNameList));
     List<TypeInfo> columnTypes;
     try {
-      columnTypes = 
HiveTypeUtils.generateColumnTypes(dataSchema.toAvroSchema());
+      columnTypes = HiveTypeUtils.generateColumnTypes(dataSchema);
     } catch (AvroSerdeException e) {
       throw new HoodieAvroSchemaException(String.format("Failed to generate 
hive column types from schema: %s, due to %s", dataSchema, e));
     }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
index 8f52b290354b..bc12027921f6 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
@@ -31,7 +31,6 @@ import org.apache.hudi.hadoop.utils.HiveJavaTypeConverter;
 import org.apache.hudi.hadoop.utils.HoodieArrayWritableSchemaUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -56,9 +55,9 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
     return FIELD_ACCESSOR_INSTANCE;
   }
 
-  private final Map<Schema, HiveAvroSerializer> serializerCache = new 
ConcurrentHashMap<>();
+  private final Map<HoodieSchema, HiveAvroSerializer> serializerCache = new 
ConcurrentHashMap<>();
 
-  private HiveAvroSerializer getHiveAvroSerializer(Schema schema) {
+  private HiveAvroSerializer getHiveAvroSerializer(HoodieSchema schema) {
     return serializerCache.computeIfAbsent(schema, HiveAvroSerializer::new);
   }
 
@@ -72,7 +71,7 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
 
   @Override
   public Object getValue(ArrayWritable record, HoodieSchema schema, String 
fieldName) {
-    return getHiveAvroSerializer(schema.toAvroSchema()).getValue(record, 
fieldName);
+    return getHiveAvroSerializer(schema).getValue(record, fieldName);
   }
 
   @Override
@@ -92,7 +91,7 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
     }
     HoodieSchema schema = getSchemaFromBufferRecord(bufferedRecord);
     ArrayWritable writable = bufferedRecord.getRecord();
-    return new HoodieHiveRecord(key, writable, schema.toAvroSchema(), 
getHiveAvroSerializer(schema.toAvroSchema()),
+    return new HoodieHiveRecord(key, writable, schema, 
getHiveAvroSerializer(schema),
         bufferedRecord.getHoodieOperation(), 
bufferedRecord.getOrderingValue(), bufferedRecord.isDelete());
   }
 
@@ -143,7 +142,7 @@ public class HiveRecordContext extends 
RecordContext<ArrayWritable> {
 
   @Override
   public GenericRecord convertToAvroRecord(ArrayWritable record, HoodieSchema 
schema) {
-    return getHiveAvroSerializer(schema.toAvroSchema()).serialize(record);
+    return getHiveAvroSerializer(schema).serialize(record);
   }
 
   @Override
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index 1ee19c32cecf..a3f5f857dc57 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.OrderingValues;
@@ -60,9 +61,9 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
 
   private final HiveAvroSerializer avroSerializer;
 
-  protected Schema schema;
+  protected HoodieSchema schema;
 
-  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HiveAvroSerializer avroSerializer) {
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema 
schema, HiveAvroSerializer avroSerializer) {
     super(key, data);
     this.avroSerializer = avroSerializer;
     this.schema = schema;
@@ -70,7 +71,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
     isDelete = data == null;
   }
 
-  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation, Comparable 
orderingValue, boolean isDelete) {
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema 
schema, HiveAvroSerializer avroSerializer, HoodieOperation hoodieOperation, 
Comparable orderingValue, boolean isDelete) {
     super(key, data, hoodieOperation, isDelete, Option.empty());
     this.orderingValue = orderingValue;
     this.avroSerializer = avroSerializer;
@@ -78,7 +79,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
     this.copy = false;
   }
 
-  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HoodieOperation operation, boolean isCopy,
+  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, HoodieSchema 
schema, HoodieOperation operation, boolean isCopy,
                            HiveAvroSerializer avroSerializer) {
     super(key, data, operation, Option.empty());
     this.schema = schema;
@@ -246,7 +247,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
     return avroSerializer.getValue(data, name);
   }
 
-  protected Schema getSchema() {
+  protected HoodieSchema getSchema() {
     return schema;
   }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index fe0c91f6f3a1..d910145bd464 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -205,7 +205,7 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
     // for presto engine, the hiveSchema will be: col1,col2, but the 
writerSchema will be col1,col2,par
     // so to be compatible with hive and presto, we should rewrite oldRecord 
before we call combineAndGetUpdateValue,
     // once presto on hudi have its own mor reader, we can remove the rewrite 
logical.
-    GenericRecord genericRecord = 
HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, 
getLogScannerReaderSchema().toAvroSchema());
+    GenericRecord genericRecord = 
HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, 
getLogScannerReaderSchema());
     RecordContext<IndexedRecord> recordContext = 
AvroRecordContext.getFieldAccessorInstance();
     BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, 
HoodieSchema.fromAvroSchema(genericRecord.getSchema()), recordContext, 
orderingFields, newRecord.getRecordKey(), false);
     BufferedRecord newBufferedRecord = 
BufferedRecords.fromHoodieRecord(newRecord, 
HoodieSchema.fromAvroSchema(getLogScannerReaderSchema().toAvroSchema()),
@@ -218,7 +218,7 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
   }
 
   private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable 
arrayWritable) {
-    GenericRecord record = serializer.serialize(arrayWritable, 
getHiveSchema().toAvroSchema());
+    GenericRecord record = serializer.serialize(arrayWritable, 
getHiveSchema());
     return record;
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
index 16690ac360b5..c0444a609283 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
@@ -18,14 +18,15 @@
 
 package org.apache.hudi.hadoop.utils;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieAvroSchemaException;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.avro.JsonProperties;
-import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericEnumSymbol;
@@ -75,17 +76,16 @@ public class HiveAvroSerializer {
   private final List<String> columnNames;
   private final List<TypeInfo> columnTypes;
   private final ArrayWritableObjectInspector objectInspector;
-  private final Schema recordSchema;
+  private final HoodieSchema recordSchema;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveAvroSerializer.class);
 
-  public HiveAvroSerializer(Schema schema) {
-    schema = AvroSchemaUtils.getNonNullTypeFromUnion(schema);
-    if (schema.getType() != Schema.Type.RECORD) {
+  public HiveAvroSerializer(HoodieSchema schema) {
+    if (schema.getNonNullType().getType() != HoodieSchemaType.RECORD) {
       throw new IllegalArgumentException("Expected record schema, but got: " + 
schema);
     }
     this.recordSchema = schema;
-    this.columnNames = 
schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
+    this.columnNames = 
schema.getFields().stream().map(HoodieSchemaField::name).map(String::toLowerCase).collect(Collectors.toList());
     try {
       this.columnTypes = HiveTypeUtils.generateColumnTypes(schema);
     } catch (AvroSerdeException e) {
@@ -171,7 +171,7 @@ public class HiveAvroSerializer {
             + "', but got " + context.typeInfo.getTypeName());
       }
 
-      if (!(context.schema.getType() == Schema.Type.RECORD)) {
+      if (!(context.schema.getType() == HoodieSchemaType.RECORD)) {
         throw new HoodieException("Expected RecordSchema while resolving '" + 
path[i]
             + "', but got " + context.schema.getType());
       }
@@ -184,15 +184,13 @@ public class HiveAvroSerializer {
   }
 
   private FieldContext extractFieldFromRecord(ArrayWritable record, 
StructObjectInspector structObjectInspector,
-                                              List<TypeInfo> fieldTypes, 
Schema schema, String fieldName) {
-    Schema.Field schemaField = schema.getField(fieldName);
-    if (schemaField == null) {
-      throw new HoodieException("Field '" + fieldName + "' not found in 
schema: " + schema);
-    }
+                                              List<TypeInfo> fieldTypes, 
HoodieSchema schema, String fieldName) {
+    HoodieSchemaField schemaField = schema.getField(fieldName)
+        .orElseThrow(() -> new HoodieException("Field '" + fieldName + "' not 
found in schema: " + schema));
 
     int fieldIdx = schemaField.pos();
     TypeInfo fieldTypeInfo = fieldTypes.get(fieldIdx);
-    Schema fieldSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(schemaField.schema());
+    HoodieSchema fieldSchema = schemaField.schema().getNonNullType();
 
     StructField structField = 
structObjectInspector.getStructFieldRef(fieldName);
     if (structField == null) {
@@ -216,9 +214,9 @@ public class HiveAvroSerializer {
     final TypeInfo typeInfo;
     final ObjectInspector objectInspector;
     final Object object;
-    final Schema schema;
+    final HoodieSchema schema;
 
-    FieldContext(Object object, ObjectInspector objectInspector, TypeInfo 
typeInfo,  Schema schema) {
+    FieldContext(Object object, ObjectInspector objectInspector, TypeInfo 
typeInfo,  HoodieSchema schema) {
       this.object = object;
       this.objectInspector = objectInspector;
       this.typeInfo = typeInfo;
@@ -226,7 +224,7 @@ public class HiveAvroSerializer {
     }
   }
 
-  private static final Schema STRING_SCHEMA = 
Schema.create(Schema.Type.STRING);
+  private static final HoodieSchema STRING_SCHEMA = 
HoodieSchema.create(HoodieSchemaType.STRING);
 
   public GenericRecord serialize(Object o) {
     if (recordSchema == null) {
@@ -235,10 +233,10 @@ public class HiveAvroSerializer {
     return serialize(o, recordSchema);
   }
 
-  public GenericRecord serialize(Object o, Schema schema) {
+  public GenericRecord serialize(Object o, HoodieSchema schema) {
 
     StructObjectInspector soi = objectInspector;
-    GenericData.Record record = new GenericData.Record(schema);
+    GenericData.Record record = new GenericData.Record(schema.toAvroSchema());
 
     List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
     if (outputFieldRefs.size() != columnNames.size()) {
@@ -251,7 +249,7 @@ public class HiveAvroSerializer {
     List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
 
     for (int i = 0; i < size; i++) {
-      Schema.Field field = schema.getFields().get(i);
+      HoodieSchemaField field = schema.getFields().get(i);
       if (i >= columnTypes.size()) {
         break;
       }
@@ -268,28 +266,27 @@ public class HiveAvroSerializer {
     return record;
   }
 
-  private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object 
structFieldData, ObjectInspector fieldOI, GenericData.Record record, 
Schema.Field field) {
+  private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object 
structFieldData, ObjectInspector fieldOI, GenericData.Record record, 
HoodieSchemaField field) {
+    // In Avro/HoodieSchema, field.defaultVal() returns:
+    // - JsonProperties.Null / HoodieSchema.NULL_VALUE = if default is 
explicitly null
+    // - null / isEmpty() = if field has NO default value
+    // - some value = if field has an actual default
     Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
-    if (val == null) {
-      if (field.defaultVal() instanceof JsonProperties.Null) {
-        record.put(field.name(), null);
-      } else {
-        record.put(field.name(), field.defaultVal());
-      }
-    } else {
-      record.put(field.name(), val);
-    }
+    Object recordValue = val != null ? val : field.defaultVal()
+        .map(defaultVal -> defaultVal == HoodieSchema.NULL_VALUE ? null : 
defaultVal)
+        .orElse(null);
+    record.put(field.name(), recordValue);
   }
 
-  private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+  private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object 
structFieldData, HoodieSchema schema) throws HoodieException {
     if (null == structFieldData) {
       return null;
     }
 
-    schema = AvroSchemaUtils.getNonNullTypeFromUnion(schema);
+    schema = schema.getNonNullType();
 
     /* Because we use Hive's 'string' type when Avro calls for enum, we have 
to expressly check for enum-ness */
-    if (Schema.Type.ENUM.equals(schema.getType())) {
+    if (HoodieSchemaType.ENUM == schema.getType()) {
       assert fieldOI instanceof PrimitiveObjectInspector;
       return serializeEnum((PrimitiveObjectInspector) fieldOI, 
structFieldData, schema);
     }
@@ -339,48 +336,51 @@ public class HiveAvroSerializer {
     }
   };
 
-  private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+  private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object 
structFieldData, HoodieSchema schema) throws HoodieException {
     try {
-      return enums.retrieve(schema).retrieve(serializePrimitive(fieldOI, 
structFieldData, schema));
+      return 
enums.retrieve(schema.toAvroSchema()).retrieve(serializePrimitive(fieldOI, 
structFieldData, schema));
     } catch (Exception e) {
       throw new HoodieException(e);
     }
   }
 
-  private Object serializeStruct(StructTypeInfo typeInfo, 
StructObjectInspector ssoi, Object o, Schema schema) {
+  private Object serializeStruct(StructTypeInfo typeInfo, 
StructObjectInspector ssoi, Object o, HoodieSchema schema) {
     int size = schema.getFields().size();
     List<? extends StructField> allStructFieldRefs = 
ssoi.getAllStructFieldRefs();
     List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
-    GenericData.Record record = new GenericData.Record(schema);
+    GenericData.Record record = new GenericData.Record(schema.toAvroSchema());
     ArrayList<TypeInfo> allStructFieldTypeInfos = 
typeInfo.getAllStructFieldTypeInfos();
 
     for (int i = 0; i < size; i++) {
-      Schema.Field field = schema.getFields().get(i);
+      HoodieSchemaField field = schema.getFields().get(i);
       setUpRecordFieldFromWritable(allStructFieldTypeInfos.get(i), 
structFieldsDataAsList.get(i),
           allStructFieldRefs.get(i).getFieldObjectInspector(), record, field);
     }
     return record;
   }
 
-  private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+  private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object 
structFieldData, HoodieSchema schema) throws HoodieException {
     switch (fieldOI.getPrimitiveCategory()) {
       case BINARY:
-        if (schema.getType() == Schema.Type.BYTES) {
+        if (schema.getType() == HoodieSchemaType.BYTES) {
           return AvroSerdeUtils.getBufferFromBytes((byte[]) 
fieldOI.getPrimitiveJavaObject(structFieldData));
-        } else if (schema.getType() == Schema.Type.FIXED) {
-          GenericData.Fixed fixed = new GenericData.Fixed(schema, (byte[]) 
fieldOI.getPrimitiveJavaObject(structFieldData));
+        } else if (schema.getType() == HoodieSchemaType.FIXED) {
+          GenericData.Fixed fixed = new 
GenericData.Fixed(schema.toAvroSchema(), (byte[]) 
fieldOI.getPrimitiveJavaObject(structFieldData));
           return fixed;
         } else {
           throw new HoodieException("Unexpected Avro schema for Binary 
TypeInfo: " + schema.getType());
         }
       case DECIMAL:
         HiveDecimal dec = (HiveDecimal) 
fieldOI.getPrimitiveJavaObject(structFieldData);
-        LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
schema.getLogicalType();
+        if (schema.getType() != HoodieSchemaType.DECIMAL) {
+          throw new HoodieException("Unexpected schema type for DECIMAL: " + 
schema.getType());
+        }
+        HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) schema;
         BigDecimal bd = new 
BigDecimal(dec.toString()).setScale(decimal.getScale());
-        if (schema.getType() == Schema.Type.BYTES) {
-          return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, schema, 
decimal);
+        if (decimal.isFixed()) {
+          return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
schema.toAvroSchema(), decimal.toAvroSchema().getLogicalType());
         } else {
-          return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, 
decimal);
+          return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, 
schema.toAvroSchema(), decimal.toAvroSchema().getLogicalType());
         }
       case CHAR:
         HiveChar ch = (HiveChar) 
fieldOI.getPrimitiveJavaObject(structFieldData);
@@ -396,7 +396,7 @@ public class HiveAvroSerializer {
       case TIMESTAMP:
         return HoodieHiveUtils.getMills(structFieldData);
       case INT:
-        if (schema.getLogicalType() != null && 
schema.getLogicalType().getName().equals("date")) {
+        if (schema.getType() == HoodieSchemaType.DATE) {
           return new 
WritableDateObjectInspector().getPrimitiveWritableObject(structFieldData).getDays();
         }
         return fieldOI.getPrimitiveJavaObject(structFieldData);
@@ -409,7 +409,7 @@ public class HiveAvroSerializer {
     }
   }
 
-  private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+  private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector 
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
     byte tag = fieldOI.getTag(structFieldData);
 
     // Invariant that Avro's tag ordering must match Hive's.
@@ -419,20 +419,21 @@ public class HiveAvroSerializer {
         schema.getTypes().get(tag));
   }
 
-  private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+  private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector 
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
     List<?> list = fieldOI.getList(structFieldData);
-    List<Object> deserialized = new GenericData.Array<Object>(list.size(), 
schema);
+    List<Object> deserialized = new GenericData.Array<>(list.size(), 
schema.toAvroSchema());
 
     TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo();
     ObjectInspector listElementObjectInspector = 
fieldOI.getListElementObjectInspector();
     // NOTE: We have to resolve nullable schema, since Avro permits array 
elements
     //       to be null
-    Schema arrayNestedType = 
AvroSchemaUtils.getNonNullTypeFromUnion(schema.getElementType());
-    Schema elementType;
+    HoodieSchema arrayNestedType = schema.getElementType().getNonNullType();
+    HoodieSchema elementType;
     if (listElementObjectInspector.getCategory() == 
ObjectInspector.Category.PRIMITIVE) {
       elementType = arrayNestedType;
     } else {
-      elementType = arrayNestedType.getField("element") == null ? 
arrayNestedType : arrayNestedType.getField("element").schema();
+      elementType =
+          
arrayNestedType.getField("element").map(HoodieSchemaField::schema).orElse(arrayNestedType);
     }
     for (int i = 0; i < list.size(); i++) {
       Object childFieldData = list.get(i);
@@ -445,7 +446,7 @@ public class HiveAvroSerializer {
     return deserialized;
   }
 
-  private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+  private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector 
fieldOI, Object structFieldData, HoodieSchema schema) throws HoodieException {
     // Avro only allows maps with string keys
     if (!mapHasStringKey(fieldOI.getMapKeyObjectInspector())) {
       throw new HoodieException("Avro only supports maps with keys as Strings. 
 Current Map is: " + typeInfo.toString());
@@ -456,7 +457,7 @@ public class HiveAvroSerializer {
     TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo();
     TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo();
     Map<?, ?> map = fieldOI.getMap(structFieldData);
-    Schema valueType = schema.getValueType();
+    HoodieSchema valueType = schema.getValueType();
 
     Map<Object, Object> deserialized = new LinkedHashMap<Object, 
Object>(fieldOI.getMapSize(structFieldData));
 
@@ -473,10 +474,10 @@ public class HiveAvroSerializer {
         && ((PrimitiveObjectInspector) 
mapKeyObjectInspector).getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.STRING);
   }
 
-  public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord 
oldRecord, Schema newSchema) {
-    GenericRecord newRecord = new GenericData.Record(newSchema);
+  public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord 
oldRecord, HoodieSchema newSchema) {
+    GenericRecord newRecord = new GenericData.Record(newSchema.toAvroSchema());
     boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
-    for (Schema.Field f : newSchema.getFields()) {
+    for (HoodieSchemaField f : newSchema.getFields()) {
       if (!(isSpecificRecord && isMetadataField(f.name()))) {
         copyOldValueOrSetDefault(oldRecord, newRecord, f);
       }
@@ -484,7 +485,7 @@ public class HiveAvroSerializer {
     return newRecord;
   }
 
-  private static void copyOldValueOrSetDefault(GenericRecord oldRecord, 
GenericRecord newRecord, Schema.Field field) {
+  private static void copyOldValueOrSetDefault(GenericRecord oldRecord, 
GenericRecord newRecord, HoodieSchemaField field) {
     Schema oldSchema = oldRecord.getSchema();
     Object fieldValue = oldSchema.getField(field.name()) == null ? null : 
oldRecord.get(field.name());
 
@@ -493,12 +494,13 @@ public class HiveAvroSerializer {
       Object newFieldValue;
       if (fieldValue instanceof GenericRecord) {
         GenericRecord record = (GenericRecord) fieldValue;
-        newFieldValue = rewriteRecordIgnoreResultCheck(record, 
AvroSchemaUtils.resolveUnionSchema(field.schema(), 
record.getSchema().getFullName()));
+        HoodieSchema fieldSchema = 
HoodieSchemaUtils.resolveUnionSchema(field.schema(), 
record.getSchema().getFullName());
+        newFieldValue = rewriteRecordIgnoreResultCheck(record, fieldSchema);
       } else {
         newFieldValue = fieldValue;
       }
       newRecord.put(field.name(), newFieldValue);
-    } else if (field.defaultVal() instanceof JsonProperties.Null) {
+    } else if (field.defaultVal() == HoodieSchema.NULL_VALUE) {
       newRecord.put(field.name(), null);
     } else {
       newRecord.put(field.name(), field.defaultVal());
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
index a5383b63ebb6..0c4dcc766472 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveTypeUtils.java
@@ -18,17 +18,17 @@
 
 package org.apache.hudi.hadoop.utils;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
-
-import static org.apache.avro.Schema.Type.BOOLEAN;
-import static org.apache.avro.Schema.Type.BYTES;
-import static org.apache.avro.Schema.Type.DOUBLE;
-import static org.apache.avro.Schema.Type.FIXED;
-import static org.apache.avro.Schema.Type.FLOAT;
-import static org.apache.avro.Schema.Type.INT;
-import static org.apache.avro.Schema.Type.LONG;
-import static org.apache.avro.Schema.Type.NULL;
-import static org.apache.avro.Schema.Type.STRING;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
+import org.apache.hadoop.hive.serde2.avro.InstanceCache;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,13 +38,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.avro.Schema;
-import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
-import org.apache.hadoop.hive.serde2.avro.InstanceCache;
-import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import static org.apache.hudi.common.schema.HoodieSchemaType.ARRAY;
+import static org.apache.hudi.common.schema.HoodieSchemaType.BOOLEAN;
+import static org.apache.hudi.common.schema.HoodieSchemaType.BYTES;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DATE;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DECIMAL;
+import static org.apache.hudi.common.schema.HoodieSchemaType.DOUBLE;
+import static org.apache.hudi.common.schema.HoodieSchemaType.ENUM;
+import static org.apache.hudi.common.schema.HoodieSchemaType.FIXED;
+import static org.apache.hudi.common.schema.HoodieSchemaType.FLOAT;
+import static org.apache.hudi.common.schema.HoodieSchemaType.INT;
+import static org.apache.hudi.common.schema.HoodieSchemaType.LONG;
+import static org.apache.hudi.common.schema.HoodieSchemaType.MAP;
+import static org.apache.hudi.common.schema.HoodieSchemaType.NULL;
+import static org.apache.hudi.common.schema.HoodieSchemaType.RECORD;
+import static org.apache.hudi.common.schema.HoodieSchemaType.STRING;
+import static org.apache.hudi.common.schema.HoodieSchemaType.TIME;
+import static org.apache.hudi.common.schema.HoodieSchemaType.TIMESTAMP;
+import static org.apache.hudi.common.schema.HoodieSchemaType.UNION;
 
 /**
  * Convert an Avro Schema to a Hive TypeInfo
@@ -66,9 +77,9 @@ public class HiveTypeUtils {
   //                  smallint
 
   // Map of Avro's primitive types to Hives (for those that are supported by 
both)
-  private static final Map<Schema.Type, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO 
= initTypeMap();
-  private static Map<Schema.Type, TypeInfo> initTypeMap() {
-    Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
+  private static final Map<HoodieSchemaType, TypeInfo> 
PRIMITIVE_TYPE_TO_TYPE_INFO = initTypeMap();
+  private static Map<HoodieSchemaType, TypeInfo> initTypeMap() {
+    Map<HoodieSchemaType, TypeInfo> theMap = new Hashtable<>();
     theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
     theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
     theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
@@ -90,7 +101,7 @@ public class HiveTypeUtils {
    *         from the schema.
    * @throws AvroSerdeException for problems during conversion.
    */
-  public static List<TypeInfo> generateColumnTypes(Schema schema) throws 
AvroSerdeException {
+  public static List<TypeInfo> generateColumnTypes(HoodieSchema schema) throws 
AvroSerdeException {
     return generateColumnTypes(schema, null);
   }
 
@@ -105,27 +116,28 @@ public class HiveTypeUtils {
    *         from the schema.
    * @throws AvroSerdeException for problems during conversion.
    */
-  public static List<TypeInfo> generateColumnTypes(Schema schema,
-                                                   Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    List<Schema.Field> fields = schema.getFields();
+  public static List<TypeInfo> generateColumnTypes(HoodieSchema schema,
+                                                   Set<HoodieSchema> 
seenSchemas) throws AvroSerdeException {
+    List<HoodieSchemaField> fields = schema.getFields();
 
     List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size());
 
-    for (Schema.Field field : fields) {
+    for (HoodieSchemaField field : fields) {
       types.add(generateTypeInfo(field.schema(), seenSchemas));
     }
 
     return types;
   }
 
-  static InstanceCache<Schema, TypeInfo> typeInfoCache = new 
InstanceCache<Schema, TypeInfo>() {
+  static InstanceCache<HoodieSchema, TypeInfo> typeInfoCache = new 
InstanceCache<HoodieSchema, TypeInfo>() {
     @Override
-    protected TypeInfo makeInstance(Schema s,
-                                    Set<Schema> seenSchemas)
+    protected TypeInfo makeInstance(HoodieSchema s,
+                                    Set<HoodieSchema> seenSchemas)
         throws AvroSerdeException {
       return generateTypeInfoWorker(s, seenSchemas);
     }
   };
+
   /**
    * Convert an Avro Schema into an equivalent Hive TypeInfo.
    * @param schema to record. Must be of record type.
@@ -134,33 +146,24 @@ public class HiveTypeUtils {
    * @return TypeInfo matching the Avro schema
    * @throws AvroSerdeException for any problems during conversion.
    */
-  public static TypeInfo generateTypeInfo(Schema schema,
-                                          Set<Schema> seenSchemas) throws 
AvroSerdeException {
+  public static TypeInfo generateTypeInfo(HoodieSchema schema,
+                                          Set<HoodieSchema> seenSchemas) 
throws AvroSerdeException {
     // For bytes type, it can be mapped to decimal.
-    Schema.Type type = schema.getType();
-    // HUDI MODIFICATION ADDED "|| type == FIXED"
-    if ((type == BYTES || type == FIXED) && AvroSerDe.DECIMAL_TYPE_NAME
-        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
-      int precision = 0;
-      int scale = 0;
-      try {
-        precision = 
getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_PRECISION));
-        scale = getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_SCALE));
-      } catch (Exception ex) {
-        throw new AvroSerdeException("Failed to obtain scale value from file 
schema: " + schema, ex);
-      }
-
+    HoodieSchemaType type = schema.getType();
+    if (type == DECIMAL) {
+      HoodieSchema.Decimal decimalSchema = (HoodieSchema.Decimal) schema;
+      int precision = decimalSchema.getPrecision();
+      int scale = decimalSchema.getScale();
       try {
         HiveDecimalUtils.validateParameter(precision, scale);
       } catch (Exception ex) {
         throw new AvroSerdeException("Invalid precision or scale for decimal 
type", ex);
       }
-
       return TypeInfoFactory.getDecimalTypeInfo(precision, scale);
     }
 
     if (type == STRING
-        && 
AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
+        && AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase((String) 
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
       int maxLength = 0;
       try {
         maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
@@ -171,7 +174,7 @@ public class HiveTypeUtils {
     }
 
     if (type == STRING && AvroSerDe.VARCHAR_TYPE_NAME
-        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+        .equalsIgnoreCase((String) 
schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
       int maxLength = 0;
       try {
         maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
@@ -181,14 +184,33 @@ public class HiveTypeUtils {
       return TypeInfoFactory.getVarcharTypeInfo(maxLength);
     }
 
-    if (type == INT
-        && 
AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
+    if (type == DATE) {
       return TypeInfoFactory.dateTypeInfo;
     }
 
-    if (type == LONG
-        && 
AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
-      return TypeInfoFactory.timestampTypeInfo;
+    if (type == TIMESTAMP) {
+      HoodieSchema.Timestamp timestampSchema = (HoodieSchema.Timestamp) schema;
+      switch (timestampSchema.getPrecision()) {
+        case MILLIS:
+          // Only millis precision is supported natively by Hive, check 
AvroSerDe for full support matrix by Hive
+          return TypeInfoFactory.timestampTypeInfo;
+        case MICROS:
+          return TypeInfoFactory.longTypeInfo;
+        default:
+          throw new IllegalArgumentException("Unsupported timestamp precision 
for Timestamp schema: " + timestampSchema.getPrecision());
+      }
+    }
+
+    if (type == TIME) {
+      HoodieSchema.Time timeSchema = (HoodieSchema.Time) schema;
+      switch (timeSchema.getPrecision()) {
+        case MILLIS:
+          return TypeInfoFactory.intTypeInfo;
+        case MICROS:
+          return TypeInfoFactory.longTypeInfo;
+        default:
+          throw new IllegalArgumentException("Unsupported time precision for 
Time schema: " + timeSchema.getPrecision());
+      }
     }
 
     return typeInfoCache.retrieve(schema, seenSchemas);
@@ -224,8 +246,8 @@ public class HiveTypeUtils {
   }
 
   // added this from AvroSerdeUtils in hive latest
-  public static int getIntFromSchema(Schema schema, String name) {
-    Object obj = schema.getObjectProp(name);
+  public static int getIntFromSchema(HoodieSchema schema, String name) {
+    Object obj = schema.getObjectProps().get(name);
     if (obj instanceof String) {
       return Integer.parseInt((String) obj);
     } else if (obj instanceof Integer) {
@@ -236,15 +258,15 @@ public class HiveTypeUtils {
     }
   }
 
-  private static TypeInfo generateTypeInfoWorker(Schema schema,
-                                                 Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    // Avro requires NULLable types to be defined as unions of some type T
+  private static TypeInfo generateTypeInfoWorker(HoodieSchema schema,
+                                                 Set<HoodieSchema> 
seenSchemas) throws AvroSerdeException {
+    // HoodieSchema requires NULLable types to be defined as unions of some 
type T
     // and NULL.  This is annoying and we're going to hide it from the user.
-    if (AvroSchemaUtils.isNullable(schema)) {
-      return generateTypeInfo(AvroSchemaUtils.getNonNullTypeFromUnion(schema), 
seenSchemas);
+    if (schema.isNullable()) {
+      return generateTypeInfo(schema.getNonNullType(), seenSchemas);
     }
 
-    Schema.Type type = schema.getType();
+    HoodieSchemaType type = schema.getType();
     if (PRIMITIVE_TYPE_TO_TYPE_INFO.containsKey(type)) {
       return PRIMITIVE_TYPE_TO_TYPE_INFO.get(type);
     }
@@ -259,12 +281,12 @@ public class HiveTypeUtils {
     }
   }
 
-  private static TypeInfo generateRecordTypeInfo(Schema schema,
-                                                 Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    assert schema.getType().equals(Schema.Type.RECORD);
+  private static TypeInfo generateRecordTypeInfo(HoodieSchema schema,
+                                                 Set<HoodieSchema> 
seenSchemas) throws AvroSerdeException {
+    ValidationUtils.checkArgument(schema.getType() == RECORD, () -> schema + " 
is not a RECORD");
 
     if (seenSchemas == null) {
-      seenSchemas = Collections.newSetFromMap(new IdentityHashMap<Schema, 
Boolean>());
+      seenSchemas = Collections.newSetFromMap(new IdentityHashMap<>());
     } else if (seenSchemas.contains(schema)) {
       throw new AvroSerdeException(
           "Recursive schemas are not supported. Recursive schema was " + schema
@@ -272,9 +294,9 @@ public class HiveTypeUtils {
     }
     seenSchemas.add(schema);
 
-    List<Schema.Field> fields = schema.getFields();
-    List<String> fieldNames = new ArrayList<String>(fields.size());
-    List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size());
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<String> fieldNames = new ArrayList<>(fields.size());
+    List<TypeInfo> typeInfos = new ArrayList<>(fields.size());
 
     for (int i = 0; i < fields.size(); i++) {
       fieldNames.add(i, fields.get(i).name());
@@ -288,33 +310,32 @@ public class HiveTypeUtils {
    * Generate a TypeInfo for an Avro Map.  This is made slightly simpler in 
that
    * Avro only allows maps with strings for keys.
    */
-  private static TypeInfo generateMapTypeInfo(Schema schema,
-                                              Set<Schema> seenSchemas) throws 
AvroSerdeException {
-    assert schema.getType().equals(Schema.Type.MAP);
-    Schema valueType = schema.getValueType();
+  private static TypeInfo generateMapTypeInfo(HoodieSchema schema,
+                                              Set<HoodieSchema> seenSchemas) 
throws AvroSerdeException {
+    ValidationUtils.checkArgument(schema.getType() == MAP, () -> schema + " is 
not MAP");
+    HoodieSchema valueType = schema.getValueType();
     TypeInfo ti = generateTypeInfo(valueType, seenSchemas);
 
     return 
TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), 
ti);
   }
 
-  private static TypeInfo generateArrayTypeInfo(Schema schema,
-                                                Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    assert schema.getType().equals(Schema.Type.ARRAY);
-    Schema itemsType = schema.getElementType();
+  private static TypeInfo generateArrayTypeInfo(HoodieSchema schema,
+                                                Set<HoodieSchema> seenSchemas) 
throws AvroSerdeException {
+    ValidationUtils.checkArgument(schema.getType() == ARRAY, () -> schema + " 
is not an ARRAY");
+    HoodieSchema itemsType = schema.getElementType();
     TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas);
 
     return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
   }
 
-  private static TypeInfo generateUnionTypeInfo(Schema schema,
-                                                Set<Schema> seenSchemas) 
throws AvroSerdeException {
-    assert schema.getType().equals(Schema.Type.UNION);
-    List<Schema> types = schema.getTypes();
-
+  private static TypeInfo generateUnionTypeInfo(HoodieSchema schema,
+                                                Set<HoodieSchema> seenSchemas) 
throws AvroSerdeException {
+    ValidationUtils.checkArgument(schema.getType() == UNION, () -> schema + 
"is not a UNION");
+    List<HoodieSchema> types = schema.getTypes();
 
-    List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size());
+    List<TypeInfo> typeInfos = new ArrayList<>(types.size());
 
-    for (Schema type : types) {
+    for (HoodieSchema type : types) {
       typeInfos.add(generateTypeInfo(type, seenSchemas));
     }
 
@@ -324,8 +345,8 @@ public class HiveTypeUtils {
   // Hive doesn't have an Enum type, so we're going to treat them as Strings.
   // During the deserialize/serialize stage we'll check for enumness and
   // convert as such.
-  private static TypeInfo generateEnumTypeInfo(Schema schema) {
-    assert schema.getType().equals(Schema.Type.ENUM);
+  private static TypeInfo generateEnumTypeInfo(HoodieSchema schema) {
+    ValidationUtils.checkArgument(schema.getType() == ENUM, () -> schema + " 
is not an ENUM");
 
     return TypeInfoFactory.getPrimitiveTypeInfo("string");
   }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
index d3c8e186551e..7a6337778407 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -50,9 +53,9 @@ class TestHoodieHiveRecord {
     // Create a minimal HoodieHiveRecord instance with mocked dependencies
     HoodieKey key = new HoodieKey("test-key", "test-partition");
     ArrayWritable data = new ArrayWritable(Writable.class, new Writable[]{new 
Text("test")});
-    Schema schema = Schema.createRecord("TestRecord", null, null, false);
-    schema.setFields(Collections.singletonList(new Schema.Field("testField", 
Schema.create(Schema.Type.STRING), null, null)));
-    
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
false,
+        Collections.singletonList(HoodieSchemaField.of("testField", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null)));
+
     // Create HoodieHiveRecord with mocked dependencies
     hoodieHiveRecord = new HoodieHiveRecord(key, data, schema, new 
HiveAvroSerializer(schema));
   }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
index d22093c24223..d4dac0934a25 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
@@ -19,15 +19,15 @@
 package org.apache.hudi.hadoop.utils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -84,24 +84,24 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testSerialize() {
-    Schema avroSchema = new Schema.Parser().parse(SIMPLE_SCHEMA);
-    // create a test record with avroSchema
-    GenericData.Record avroRecord = new GenericData.Record(avroSchema);
+    HoodieSchema schema = HoodieSchema.parse(SIMPLE_SCHEMA);
+    // create a test record with schema
+    GenericData.Record avroRecord = new 
GenericData.Record(schema.toAvroSchema());
     avroRecord.put("id", 1);
     avroRecord.put("col1", 1000L);
     avroRecord.put("col2", -5.001f);
     avroRecord.put("col3", 12.999d);
-    Schema currentDecimalType = 
avroSchema.getField("col4").schema().getTypes().get(1);
-    BigDecimal bd = new BigDecimal("123.456").setScale(((LogicalTypes.Decimal) 
currentDecimalType.getLogicalType()).getScale());
-    avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
currentDecimalType, currentDecimalType.getLogicalType()));
+    HoodieSchema.Decimal currentDecimalType = (HoodieSchema.Decimal) 
schema.getField("col4").get().schema().getTypes().get(1);
+    BigDecimal bd = new 
BigDecimal("123.456").setScale(currentDecimalType.getScale());
+    avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
currentDecimalType.toAvroSchema(), 
currentDecimalType.toAvroSchema().getLogicalType()));
     avroRecord.put("col5", "2011-01-01");
     avroRecord.put("col6", 18987);
     avroRecord.put("col7", 1640491505111222L);
     avroRecord.put("col8", false);
     ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53});
     avroRecord.put("col9", bb);
-    assertTrue(GenericData.get().validate(avroSchema, avroRecord));
-    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema, 
true);
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), avroRecord));
+    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
schema.toAvroSchema(), true);
 
     List<Writable> writableList = 
Arrays.stream(writable.get()).collect(Collectors.toList());
     writableList.remove(writableList.size() - 1);
@@ -110,20 +110,20 @@ public class TestHiveAvroSerializer {
     List<TypeInfo> columnTypeList = 
createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary,date");
     List<String> columnNameList = 
createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9,par");
     StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
-    GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, avroSchema);
-    assertTrue(GenericData.get().validate(avroSchema, testRecord));
+    GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, schema);
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecord));
     // test
     List<TypeInfo> columnTypeListClip = 
createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary");
     List<String> columnNameListClip = 
createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9");
     StructTypeInfo rowTypeInfoClip = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameListClip, columnTypeListClip);
-    GenericRecord testRecordClip = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, 
columnTypeListClip).serialize(clipWritable, avroSchema);
-    assertTrue(GenericData.get().validate(avroSchema, testRecordClip));
+    GenericRecord testRecordClip = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, 
columnTypeListClip).serialize(clipWritable, schema);
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), 
testRecordClip));
   }
 
   @Test
   public void testNestedValueSerialize() {
-    Schema nestedSchema = new Schema.Parser().parse(NESTED_SCHEMA);
-    GenericRecord avroRecord = new GenericData.Record(nestedSchema);
+    HoodieSchema nestedSchema = HoodieSchema.parse(NESTED_SCHEMA);
+    GenericRecord avroRecord = new 
GenericData.Record(nestedSchema.toAvroSchema());
     avroRecord.put("firstname", "person1");
     avroRecord.put("lastname", "person2");
     GenericArray scores = new 
GenericData.Array<>(avroRecord.getSchema().getField("scores").schema(), 
Arrays.asList(1,2));
@@ -136,14 +136,14 @@ public class TestHiveAvroSerializer {
     GenericArray teachers = new 
GenericData.Array<>(avroRecord.getSchema().getField("teachers").schema(), 
Arrays.asList(studentRecord));
     avroRecord.put("teachers", teachers);
 
-    assertTrue(GenericData.get().validate(nestedSchema, avroRecord));
-    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, nestedSchema, 
true);
+    assertTrue(GenericData.get().validate(nestedSchema.toAvroSchema(), 
avroRecord));
+    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
nestedSchema.toAvroSchema(), true);
 
     List<TypeInfo> columnTypeList = 
createHiveTypeInfoFrom("string,string,array<int>,struct<firstname:string,lastname:string>,array<struct<firstname:string,lastname:string>>");
     List<String> columnNameList = 
createHiveColumnsFrom("firstname,lastname,arrayRecord,student,teachers");
     StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
     GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, nestedSchema);
-    assertTrue(GenericData.get().validate(nestedSchema, testRecord));
+    assertTrue(GenericData.get().validate(nestedSchema.toAvroSchema(), 
testRecord));
   }
 
   private List<String> createHiveColumnsFrom(final String columnNamesStr) {
@@ -198,7 +198,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetTopLevelFields() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -216,7 +216,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetNestedFields() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -234,7 +234,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testInvalidFieldNameThrows() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -257,7 +257,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetValueFromArrayOrMap() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
@@ -298,7 +298,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetJavaTopLevelFields() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -316,7 +316,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetJavaNestedFields() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_RECORD);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_NESTED_RECORD);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
@@ -334,7 +334,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetJavaArrayAndMap() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
@@ -381,7 +381,7 @@ public class TestHiveAvroSerializer {
 
   @Test
   public void testGetJavaInvalidFieldAccess() {
-    Schema schema = new Schema.Parser().parse(SCHEMA_WITH_ARRAY_AND_MAP);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA_WITH_ARRAY_AND_MAP);
     HiveAvroSerializer serializer = new HiveAvroSerializer(schema);
 
     ArrayWritable tagsArray = new ArrayWritable(Text.class, new Text[]{
@@ -409,4 +409,203 @@ public class TestHiveAvroSerializer {
       serializer.getValueAsJava(record, "properties.value");
     });
   }
+
+  @Test
+  public void testSerializeDecimalBackedByBytes() {
+    // Create schema with BYTES-backed decimal (not FIXED)
+    String schemaWithBytesDecimal = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithBytesDecimal);
+
+    // Create an Avro record with BYTES-backed decimal
+    GenericData.Record avroRecord = new 
GenericData.Record(schema.toAvroSchema());
+    avroRecord.put("id", 42);
+
+    HoodieSchema.Decimal decimalType = (HoodieSchema.Decimal) 
schema.getField("amount").get().schema().getTypes().get(1);
+    BigDecimal bd = new BigDecimal("1234.56").setScale(decimalType.getScale());
+    ByteBuffer decimalBytes = HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, 
decimalType.toAvroSchema(), decimalType.toAvroSchema().getLogicalType());
+    avroRecord.put("amount", decimalBytes);
+
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), avroRecord));
+
+    // Convert to ArrayWritable
+    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
schema.toAvroSchema(), true);
+
+    // Set up Hive types and serializer
+    List<TypeInfo> columnTypeList = 
createHiveTypeInfoFrom("int,decimal(10,2)");
+    List<String> columnNameList = createHiveColumnsFrom("id,amount");
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+
+    // Serialize and verify
+    GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, schema);
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecord));
+
+    // Verify the decimal value is correctly serialized
+    assertEquals(42, testRecord.get("id"));
+    ByteBuffer resultBytes = (ByteBuffer) testRecord.get("amount");
+    BigDecimal resultDecimal = 
HoodieAvroUtils.DECIMAL_CONVERSION.fromBytes(resultBytes, 
decimalType.toAvroSchema(), decimalType.toAvroSchema().getLogicalType());
+    assertEquals(bd, resultDecimal);
+  }
+
+  @Test
+  public void testSerializeDecimalBackedByFixed() {
+    // Create schema with FIXED-backed decimal (existing test covers this but 
making it explicit)
+    String schemaWithFixedDecimal = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed_decimal\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithFixedDecimal);
+
+    // Create an Avro record with FIXED-backed decimal
+    GenericData.Record avroRecord = new 
GenericData.Record(schema.toAvroSchema());
+    avroRecord.put("id", 42);
+
+    HoodieSchema.Decimal decimalType = (HoodieSchema.Decimal) 
schema.getField("amount").get().schema().getTypes().get(1);
+    BigDecimal bd = new BigDecimal("1234.56").setScale(decimalType.getScale());
+    avroRecord.put("amount", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
decimalType.toAvroSchema(), decimalType.toAvroSchema().getLogicalType()));
+
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), avroRecord));
+
+    // Convert to ArrayWritable
+    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
schema.toAvroSchema(), true);
+
+    // Set up Hive types and serializer
+    List<TypeInfo> columnTypeList = 
createHiveTypeInfoFrom("int,decimal(10,2)");
+    List<String> columnNameList = createHiveColumnsFrom("id,amount");
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+
+    // Serialize and verify
+    GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, schema);
+    assertTrue(GenericData.get().validate(schema.toAvroSchema(), testRecord));
+
+    // Verify the decimal value is correctly serialized
+    assertEquals(42, testRecord.get("id"));
+    GenericData.Fixed resultFixed = (GenericData.Fixed) 
testRecord.get("amount");
+    BigDecimal resultDecimal = 
HoodieAvroUtils.DECIMAL_CONVERSION.fromFixed(resultFixed, 
decimalType.toAvroSchema(), decimalType.toAvroSchema().getLogicalType());
+    assertEquals(bd, resultDecimal);
+  }
+
+  @Test
+  public void testGenerateColumnTypesForDecimalBackedByBytes() throws 
AvroSerdeException {
+    // Test HiveTypeUtils.generateColumnTypes and convertToTypeInfo branch at 
lines 152-162 for decimal backed by bytes
+    String schemaWithDecimalBytes = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithDecimalBytes);
+
+    // Test that HiveTypeUtils.generateColumnTypes correctly identifies 
bytes-backed decimal as decimal type
+    List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+    assertEquals(2, columnTypes.size());
+    assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+    // The second column should be decimal(10,2) type due to the decimal 
logical type backed by bytes
+    assertEquals(TypeInfoFactory.getDecimalTypeInfo(10, 2), 
columnTypes.get(1));
+  }
+
+  @Test
+  public void testGenerateColumnTypesForDecimalBackedByFixed() throws 
AvroSerdeException {
+    // Test HiveTypeUtils.generateColumnTypes and convertToTypeInfo branch at 
lines 152-162 for decimal backed by fixed
+    String schemaWithDecimalFixed = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed_decimal\",\"size\":6,\"logicalType\":\"decimal\",\"precision\":12,\"scale\":4}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithDecimalFixed);
+    assertInstanceOf(HoodieSchema.Decimal.class, 
schema.getField("amount").get().getNonNullSchema());
+
+    // Test that HiveTypeUtils.generateColumnTypes correctly identifies 
fixed-backed decimal as decimal type
+    List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+    assertEquals(2, columnTypes.size());
+    assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+    // The second column should be decimal(12,4) type due to the decimal 
logical type backed by fixed
+    assertEquals(TypeInfoFactory.getDecimalTypeInfo(12, 4), 
columnTypes.get(1));
+  }
+
+  @Test
+  public void testGenerateColumnTypesForDate() throws AvroSerdeException {
+    // Test HiveTypeUtils.generateColumnTypes and convertToTypeInfo branch at 
lines 187-189 for date
+    String schemaWithDate = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"birth_date\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithDate);
+
+    // Test that HiveTypeUtils.generateColumnTypes correctly identifies date 
as date type
+    List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+    assertEquals(2, columnTypes.size());
+    assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+    // The second column should be date type due to the date logical type
+    assertEquals(TypeInfoFactory.dateTypeInfo, columnTypes.get(1));
+  }
+
+  @Test
+  public void testGenerateColumnTypesForTimestampMillis() throws 
AvroSerdeException {
+    // Test HiveTypeUtils.generateColumnTypes and convertToTypeInfo branch at 
lines 192-194 for timestamp-millis
+    String schemaWithTimestampMillis = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"created_at\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithTimestampMillis);
+
+    // Test that HiveTypeUtils.generateColumnTypes correctly identifies 
timestamp-millis as timestamp type
+    List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+    assertEquals(2, columnTypes.size());
+    assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+    // The second column should be timestamp type due to the timestamp-millis 
logical type
+    assertEquals(TypeInfoFactory.timestampTypeInfo, columnTypes.get(1));
+  }
+
+  @Test
+  public void testGenerateColumnTypesForTimestampMicros() throws 
AvroSerdeException {
+    // Test timestamp-micros - AvroSerDe.TIMESTAMP_TYPE_NAME is only 
"timestamp-millis", NOT "timestamp-micros"
+    String schemaWithTimestampMicros = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"updated_at\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithTimestampMicros);
+
+    List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+    assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+    // The second column should be bigint type
+    assertEquals(TypeInfoFactory.longTypeInfo, columnTypes.get(1));
+  }
+
+  @Test
+  public void testGenerateColumnTypesForTimeMillis() throws AvroSerdeException 
{
+    // Test time-millis logical type - there's no specific branch for TIME 
type in HiveTypeUtils
+    String schemaWithTimeMillis = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"event_time\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"time-millis\"}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithTimeMillis);
+    List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+    System.out.println(columnTypes);
+    assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+    // The second column should be int type
+    assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(1));
+  }
+
+  @Test
+  public void testGenerateColumnTypesForTimeMicros() throws AvroSerdeException 
{
+    // Test time-micros logical type
+    String schemaWithTimeMicros = 
"{\"type\":\"record\",\"name\":\"test_record\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"int\"},"
+        + 
"{\"name\":\"event_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"time-micros\"}],\"default\":null}"
+        + "]}";
+
+    HoodieSchema schema = HoodieSchema.parse(schemaWithTimeMicros);
+    List<TypeInfo> columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+    assertEquals(TypeInfoFactory.intTypeInfo, columnTypes.get(0));
+    // The second column should be bigint type
+    assertEquals(TypeInfoFactory.longTypeInfo, columnTypes.get(1));
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
index 1c5a9575ecd5..026a0cdbea61 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
@@ -80,14 +80,14 @@ public class TestHoodieArrayWritableSchemaUtils {
 
     //We reuse the ArrayWritable, so we need to get the values before 
projecting
     ArrayWritable record = 
convertArrayWritable(dataGen.generateGenericRecord());
-    HiveAvroSerializer fromSerializer = new 
HiveAvroSerializer(from.toAvroSchema());
+    HiveAvroSerializer fromSerializer = new HiveAvroSerializer(from);
     Object tripType = fromSerializer.getValue(record, "trip_type");
     Object currentTs = fromSerializer.getValue(record, "current_ts");
     Object weight = fromSerializer.getValue(record, "weight");
 
     //Make sure the projected fields can be read
     ArrayWritable projectedRecord = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(record, from, to, 
Collections.emptyMap());
-    HiveAvroSerializer toSerializer = new 
HiveAvroSerializer(to.toAvroSchema());
+    HiveAvroSerializer toSerializer = new HiveAvroSerializer(to);
     assertEquals(tripType, toSerializer.getValue(projectedRecord, 
"trip_type"));
     assertEquals(currentTs, toSerializer.getValue(projectedRecord, 
"current_ts"));
     assertEquals(weight, toSerializer.getValue(projectedRecord, "weight"));
@@ -320,8 +320,8 @@ public class TestHoodieArrayWritableSchemaUtils {
       Writable newWritable,
       HoodieSchema newSchema
   ) throws AvroSerdeException {
-    TypeInfo oldTypeInfo = 
HiveTypeUtils.generateTypeInfo(oldSchema.toAvroSchema(), 
Collections.emptySet());
-    TypeInfo newTypeInfo = 
HiveTypeUtils.generateTypeInfo(newSchema.toAvroSchema(), 
Collections.emptySet());
+    TypeInfo oldTypeInfo = HiveTypeUtils.generateTypeInfo(oldSchema, 
Collections.emptySet());
+    TypeInfo newTypeInfo = HiveTypeUtils.generateTypeInfo(newSchema, 
Collections.emptySet());
 
     ObjectInspector oldObjectInspector = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(oldTypeInfo);
     ObjectInspector newObjectInspector = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(newTypeInfo);

Reply via email to