This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 04cfe3272fb0 feat(schema): Migrate hudi-flink to use HoodieSchema 
instead of avro Schema (#14355)
04cfe3272fb0 is described below

commit 04cfe3272fb03663a5e280ad4f338bb44e3821ce
Author: Rahil C <[email protected]>
AuthorDate: Tue Dec 9 10:49:10 2025 -0800

    feat(schema): Migrate hudi-flink to use HoodieSchema instead of avro Schema 
(#14355)
    
    * Intial flink schema changes
    
    * checkstyle fixes
    
    * fix compilation issues
    
    * add converter
    
    * address tim comments, add converter for hoodie schema to flink types
    
    * fix clustering IT
    
    * address comments
    
    * start with hoodie schema for certain tests when converting to flink data 
type
    
    * use helper from TableSchemaResolver
    
    * address all comments from tim and danny
    
    * address tim comment on single nullable check
    
    * resolve minor things from merge
    
    * address tim minor comments
---
 .../apache/hudi/util/HoodieSchemaConverter.java    | 461 ++++++++++++++++++
 .../hudi/util/TestHoodieSchemaConverter.java       | 531 +++++++++++++++++++++
 .../apache/hudi/common/schema/HoodieSchema.java    |  11 +
 .../hudi/common/schema/HoodieSchemaUtils.java      |  37 ++
 .../hudi/common/schema/TestHoodieSchema.java       |   3 +-
 .../hudi/sink/bootstrap/BootstrapOperator.java     |   7 +-
 .../hudi/sink/clustering/ClusteringOperator.java   |  16 +-
 .../sink/clustering/HoodieFlinkClusteringJob.java  |   8 +-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |   4 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |  44 +-
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  10 +-
 .../table/format/FlinkRowDataReaderContext.java    |   2 +-
 .../org/apache/hudi/table/format/FormatUtils.java  |  17 +-
 .../table/format/HoodieRowDataParquetReader.java   |  19 +-
 .../apache/hudi/table/format/RecordIterators.java  |   7 +-
 .../hudi/table/format/cdc/CdcInputFormat.java      |  27 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |  18 +-
 .../java/org/apache/hudi/util/CompactionUtil.java  |  10 +-
 .../hudi/util/JsonDeserializationFunction.java     |   2 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   9 +-
 .../apache/hudi/sink/ITTestDataStreamV2Write.java  |   4 +-
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |   6 +-
 .../bucket/ITTestConsistentBucketStreamWrite.java  |   4 +-
 .../sink/cluster/ITTestHoodieFlinkClustering.java  |  16 +-
 .../apache/hudi/sink/compact/TestCustomSerDe.java  |  16 +-
 .../utils/BucketStreamWriteFunctionWrapper.java    |   4 +-
 .../hudi/sink/utils/BulkInsertFunctionWrapper.java |   4 +-
 .../hudi/sink/utils/InsertFunctionWrapper.java     |   4 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   4 +-
 .../apache/hudi/source/TestStreamReadOperator.java |  10 +-
 .../apache/hudi/table/ITTestSchemaEvolution.java   |  57 ++-
 .../table/TestHoodieFileGroupReaderOnFlink.java    |  16 +-
 .../apache/hudi/table/TestHoodieTableSource.java   |   6 +-
 .../format/TestFlinkRowDataReaderContext.java      |   1 +
 .../apache/hudi/utils/TestAvroSchemaConverter.java |  10 +-
 .../test/java/org/apache/hudi/utils/TestData.java  |   6 +-
 36 files changed, 1235 insertions(+), 176 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
new file mode 100644
index 000000000000..14c59d5789a8
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.ReflectionUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Converts Flink's LogicalType into HoodieSchema.
+ */
+public class HoodieSchemaConverter {
+
+  /**
+   * Converts a Flink LogicalType into a HoodieSchema.
+   *
+   * <p>Uses "record" as the default type name for record types.
+   *
+   * @param logicalType Flink logical type definition
+   * @return HoodieSchema matching the logical type
+   */
+  public static HoodieSchema convertToSchema(LogicalType logicalType) {
+    return convertToSchema(logicalType, "record");
+  }
+
+  /**
+   * Converts a Flink LogicalType into a HoodieSchema with specified record 
name.
+   *
+   * <p>The "{rowName}." is used as the nested row type name prefix in order 
to generate
+   * the right schema. Nested record types that only differ by type name are 
still compatible.
+   *
+   * @param logicalType Flink logical type
+   * @param rowName     the record name
+   * @return HoodieSchema matching this logical type
+   */
+  public static HoodieSchema convertToSchema(LogicalType logicalType, String 
rowName) {
+    int precision;
+    boolean nullable = logicalType.isNullable();
+    HoodieSchema schema;
+
+    switch (logicalType.getTypeRoot()) {
+      case NULL:
+        return HoodieSchema.create(HoodieSchemaType.NULL);
+
+      case BOOLEAN:
+        schema = HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+        break;
+
+      case TINYINT:
+      case SMALLINT:
+      case INTEGER:
+        schema = HoodieSchema.create(HoodieSchemaType.INT);
+        break;
+
+      case BIGINT:
+        schema = HoodieSchema.create(HoodieSchemaType.LONG);
+        break;
+
+      case FLOAT:
+        schema = HoodieSchema.create(HoodieSchemaType.FLOAT);
+        break;
+
+      case DOUBLE:
+        schema = HoodieSchema.create(HoodieSchemaType.DOUBLE);
+        break;
+
+      case CHAR:
+      case VARCHAR:
+        schema = HoodieSchema.create(HoodieSchemaType.STRING);
+        break;
+
+      case BINARY:
+      case VARBINARY:
+        schema = HoodieSchema.create(HoodieSchemaType.BYTES);
+        break;
+
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        final TimestampType timestampType = (TimestampType) logicalType;
+        precision = timestampType.getPrecision();
+        if (precision <= 3) {
+          schema = HoodieSchema.createTimestampMillis();
+        } else if (precision <= 6) {
+          schema = HoodieSchema.createTimestampMicros();
+        } else {
+          throw new IllegalArgumentException(
+              "HoodieSchema does not support TIMESTAMP type with precision: "
+                  + precision
+                  + ", it only supports precisions <= 6.");
+        }
+        break;
+
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        final LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) logicalType;
+        precision = localZonedTimestampType.getPrecision();
+        if (precision <= 3) {
+          schema = HoodieSchema.createLocalTimestampMillis();
+        } else if (precision <= 6) {
+          schema = HoodieSchema.createLocalTimestampMicros();
+        } else {
+          throw new IllegalArgumentException(
+              "HoodieSchema does not support LOCAL TIMESTAMP type with 
precision: "
+                  + precision
+                  + ", it only supports precisions <= 6.");
+        }
+        break;
+
+      case DATE:
+        schema = HoodieSchema.createDate();
+        break;
+
+      case TIME_WITHOUT_TIME_ZONE:
+        precision = ((TimeType) logicalType).getPrecision();
+        if (precision <= 3) {
+          schema = HoodieSchema.createTimeMillis();
+        } else if (precision <= 6) {
+          schema = HoodieSchema.createTimeMicros();
+        } else {
+          throw new IllegalArgumentException(
+              "HoodieSchema does not support TIME type with precision: "
+                  + precision
+                  + ", maximum precision is 6 (microseconds).");
+        }
+        break;
+
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) logicalType;
+        int fixedSize = 
computeMinBytesForDecimalPrecision(decimalType.getPrecision());
+        schema = HoodieSchema.createDecimal(
+            String.format("%s.fixed", rowName),
+            null,
+            null,
+            decimalType.getPrecision(),
+            decimalType.getScale(),
+            fixedSize
+        );
+        break;
+
+      case ROW:
+        RowType rowType = (RowType) logicalType;
+        List<String> fieldNames = rowType.getFieldNames();
+
+        List<HoodieSchemaField> hoodieFields = new ArrayList<>();
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+          String fieldName = fieldNames.get(i);
+          LogicalType fieldType = rowType.getTypeAt(i);
+
+          // Recursive call for field schema
+          HoodieSchema fieldSchema = convertToSchema(fieldType, rowName + "." 
+ fieldName);
+
+          // Create field with or without default value
+          HoodieSchemaField field;
+          if (fieldType.isNullable()) {
+            field = HoodieSchemaField.of(fieldName, fieldSchema, null, 
HoodieSchema.NULL_VALUE);
+          } else {
+            field = HoodieSchemaField.of(fieldName, fieldSchema);
+          }
+          hoodieFields.add(field);
+        }
+
+        schema = HoodieSchema.createRecord(rowName, null, null, hoodieFields);
+        break;
+
+      case MULTISET:
+      case MAP:
+        LogicalType valueType = extractValueTypeForMap(logicalType);
+        HoodieSchema valueSchema = convertToSchema(valueType, rowName);
+        schema = HoodieSchema.createMap(valueSchema);
+        break;
+
+      case ARRAY:
+        ArrayType arrayType = (ArrayType) logicalType;
+        HoodieSchema elementSchema = 
convertToSchema(arrayType.getElementType(), rowName);
+        schema = HoodieSchema.createArray(elementSchema);
+        break;
+
+      case RAW:
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported type for HoodieSchema conversion: " + logicalType);
+    }
+
+    return nullable ? HoodieSchema.createNullable(schema) : schema;
+  }
+
+  /**
+   * Extracts value type for map conversion.
+   * Maps must have string keys for Avro/HoodieSchema compatibility.
+   */
+  private static LogicalType extractValueTypeForMap(LogicalType type) {
+    LogicalType keyType;
+    LogicalType valueType;
+    if (type instanceof MapType) {
+      MapType mapType = (MapType) type;
+      keyType = mapType.getKeyType();
+      valueType = mapType.getValueType();
+    } else {
+      MultisetType multisetType = (MultisetType) type;
+      keyType = multisetType.getElementType();
+      valueType = new IntType();
+    }
+    if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+      throw new UnsupportedOperationException(
+          "HoodieSchema doesn't support non-string as key type of map. "
+              + "The key type is: "
+              + keyType.asSummaryString());
+    }
+    return valueType;
+  }
+
+  /**
+   * Returns whether the given logical type belongs to the family.
+   */
+  private static boolean isFamily(LogicalType logicalType, LogicalTypeFamily 
family) {
+    return logicalType.getTypeRoot().getFamilies().contains(family);
+  }
+
+  /**
+   * Computes minimum bytes needed for decimal precision.
+   * This ensures compatibility with Avro fixed-size decimal representation.
+   */
+  private static int computeMinBytesForDecimalPrecision(int precision) {
+    int numBytes = 1;
+    while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
+      numBytes += 1;
+    }
+    return numBytes;
+  }
+
+  // ===== Conversion from HoodieSchema to Flink DataType =====
+
+  /**
+   * Converts a HoodieSchema into Flink's DataType.
+   *
+   * <p>This method provides native conversion from HoodieSchema to Flink 
DataType
+   * without going through Avro intermediate representation, future-proofing 
the
+   * implementation against changes in the Avro layer.
+   *
+   * @param hoodieSchema the HoodieSchema to convert
+   * @return Flink DataType matching the schema
+   * @throws IllegalArgumentException if the schema contains unsupported types
+   */
+  public static DataType convertToDataType(HoodieSchema hoodieSchema) {
+    if (hoodieSchema == null) {
+      throw new IllegalArgumentException("HoodieSchema cannot be null");
+    }
+
+    HoodieSchemaType type = hoodieSchema.getType();
+
+    switch (type) {
+      case NULL:
+        return DataTypes.NULL();
+      case BOOLEAN:
+        return DataTypes.BOOLEAN().notNull();
+      case INT:
+        return DataTypes.INT().notNull();
+      case LONG:
+        return DataTypes.BIGINT().notNull();
+      case FLOAT:
+        return DataTypes.FLOAT().notNull();
+      case DOUBLE:
+        return DataTypes.DOUBLE().notNull();
+      case BYTES:
+        return DataTypes.BYTES().notNull();
+      case STRING:
+        return DataTypes.STRING().notNull();
+      case ENUM:
+        // Flink doesn't have native enum type, convert to STRING
+        return DataTypes.STRING().notNull();
+      case FIXED:
+        return DataTypes.VARBINARY(hoodieSchema.getFixedSize()).notNull();
+      case DECIMAL:
+        return convertDecimal(hoodieSchema);
+      case DATE:
+        return DataTypes.DATE().notNull();
+      case TIME:
+        return convertTime(hoodieSchema);
+      case TIMESTAMP:
+        return convertTimestamp(hoodieSchema);
+      case UUID:
+        return DataTypes.STRING().notNull();
+      case ARRAY:
+        return convertArray(hoodieSchema);
+      case MAP:
+        return convertMap(hoodieSchema);
+      case RECORD:
+        return convertRecord(hoodieSchema);
+      case UNION:
+        return convertUnion(hoodieSchema);
+      default:
+        throw new IllegalArgumentException("Unsupported HoodieSchemaType: " + 
type);
+    }
+  }
+
+  /**
+   * Converts a HoodieSchema (RECORD type) into a Flink RowType.
+   *
+   * @param schema HoodieSchema to convert (must be a RECORD type)
+   * @return RowType matching the HoodieSchema structure
+   * @throws IllegalArgumentException if schema is null or not a RECORD type
+   */
+  public static RowType convertToRowType(HoodieSchema schema) {
+    if (schema == null) {
+      throw new IllegalArgumentException("HoodieSchema cannot be null");
+    }
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      throw new IllegalArgumentException(
+          "Only RECORD type schemas can be converted to RowType, got: " + 
schema.getType());
+    }
+
+    DataType dataType = convertToDataType(schema);
+    return (RowType) dataType.getLogicalType();
+  }
+
+  private static DataType convertDecimal(HoodieSchema schema) {
+    if (!(schema instanceof HoodieSchema.Decimal)) {
+      throw new IllegalStateException("Expected HoodieSchema.Decimal but got: 
" + schema.getClass());
+    }
+    HoodieSchema.Decimal decimalSchema = (HoodieSchema.Decimal) schema;
+    return DataTypes.DECIMAL(decimalSchema.getPrecision(), 
decimalSchema.getScale()).notNull();
+  }
+
+  private static DataType convertTimestamp(HoodieSchema schema) {
+    if (!(schema.getType() == HoodieSchemaType.TIMESTAMP)) {
+      throw new IllegalStateException("Expected HoodieSchema.Timestamp but 
got: " + schema.getClass());
+    }
+    HoodieSchema.Timestamp timestampSchema = (HoodieSchema.Timestamp) schema;
+    int flinkPrecision = (timestampSchema.getPrecision() == 
HoodieSchema.TimePrecision.MILLIS) ? 3 : 6;
+
+    if (timestampSchema.isUtcAdjusted()) {
+      return DataTypes.TIMESTAMP(flinkPrecision).notNull();
+    } else {
+      return 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(flinkPrecision).notNull();
+    }
+  }
+
+  private static DataType convertTime(HoodieSchema schema) {
+    if (!(schema.getType() == HoodieSchemaType.TIME)) {
+      throw new IllegalStateException("Expected HoodieSchema.Time but got: " + 
schema.getClass());
+    }
+    HoodieSchema.Time timeSchema = (HoodieSchema.Time) schema;
+    int flinkPrecision = (timeSchema.getPrecision() == 
HoodieSchema.TimePrecision.MILLIS) ? 3 : 6;
+    return DataTypes.TIME(flinkPrecision).notNull();
+  }
+
+  private static DataType convertRecord(HoodieSchema schema) {
+    List<HoodieSchemaField> fields = schema.getFields();
+    DataTypes.Field[] flinkFields = new DataTypes.Field[fields.size()];
+
+    for (int i = 0; i < fields.size(); i++) {
+      HoodieSchemaField field = fields.get(i);
+      DataType fieldType = convertToDataType(field.schema());
+      flinkFields[i] = DataTypes.FIELD(field.name(), fieldType);
+    }
+
+    return DataTypes.ROW(flinkFields).notNull();
+  }
+
+  private static DataType convertArray(HoodieSchema schema) {
+    HoodieSchema elementSchema = schema.getElementType();
+    DataType elementType = convertToDataType(elementSchema);
+    return DataTypes.ARRAY(elementType).notNull();
+  }
+
+  private static DataType convertMap(HoodieSchema schema) {
+    HoodieSchema valueSchema = schema.getValueType();
+    DataType valueType = convertToDataType(valueSchema);
+    return DataTypes.MAP(DataTypes.STRING().notNull(), valueType).notNull();
+  }
+
+  private static DataType convertUnion(HoodieSchema schema) {
+    List<HoodieSchema> unionTypes = schema.getTypes();
+
+    // Simple nullable union [null, T]
+    if (schema.isNullable() && unionTypes.size() == 2) {
+      HoodieSchema nonNullType = schema.getNonNullType();
+      DataType converted = convertToDataType(nonNullType);
+      return converted.nullable();
+    }
+
+    // Single-type union
+    if (unionTypes.size() == 1) {
+      return convertToDataType(unionTypes.get(0));
+    }
+
+    // Complex multi-type unions - use RAW type (matches AvroSchemaConverter 
logic)
+    List<HoodieSchema> nonNullTypes = unionTypes.stream()
+        .filter(t -> t.getType() != HoodieSchemaType.NULL)
+        .collect(Collectors.toList());
+
+    boolean nullable = unionTypes.size() > nonNullTypes.size();
+
+    // Use RAW type for complex unions
+    DataType rawDataType = (DataType) ReflectionUtils.invokeStaticMethod(
+        "org.apache.hudi.utils.DataTypeUtils",
+        "createAtomicRawType",
+        new Object[] {false, Types.GENERIC(Object.class)},
+        Boolean.class,
+        TypeInformation.class);
+
+    if (recordTypesOfSameNumFields(nonNullTypes)) {
+      DataType converted = DataTypes.ROW(
+              DataTypes.FIELD("wrapper", rawDataType))
+          .notNull();
+      return nullable ? converted.nullable() : converted;
+    }
+
+    return nullable ? rawDataType.nullable() : rawDataType;
+  }
+
+  /**
+   * Returns true if all the types are RECORD type with same number of fields.
+   */
+  private static boolean recordTypesOfSameNumFields(List<HoodieSchema> types) {
+    if (types == null || types.isEmpty()) {
+      return false;
+    }
+    if (types.stream().anyMatch(s -> s.getType() != HoodieSchemaType.RECORD)) {
+      return false;
+    }
+    int numFields = types.get(0).getFields().size();
+    return types.stream().allMatch(s -> s.getFields().size() == numFields);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
new file mode 100644
index 000000000000..c4892c48e696
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSchemaConverter}.
+ */
+public class TestHoodieSchemaConverter {
+
+  @Test
+  public void testPrimitiveTypes() {
+    // String
+    HoodieSchema stringSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.STRING().notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.STRING, stringSchema.getType());
+
+    // Int
+    HoodieSchema intSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.INT().notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.INT, intSchema.getType());
+
+    // Long
+    HoodieSchema longSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.BIGINT().notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.LONG, longSchema.getType());
+
+    // Float
+    HoodieSchema floatSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.FLOAT().notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.FLOAT, floatSchema.getType());
+
+    // Double
+    HoodieSchema doubleSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.DOUBLE().notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.DOUBLE, doubleSchema.getType());
+
+    // Boolean
+    HoodieSchema boolSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.BOOLEAN().notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.BOOLEAN, boolSchema.getType());
+
+    // Bytes
+    HoodieSchema bytesSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.BYTES().notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.BYTES, bytesSchema.getType());
+  }
+
+  @Test
+  public void testNullableTypes() {
+    HoodieSchema nullableString = HoodieSchemaConverter.convertToSchema(
+        DataTypes.STRING().nullable().getLogicalType());
+    assertEquals(HoodieSchemaType.UNION, nullableString.getType());
+    assertTrue(nullableString.isNullable());
+
+    HoodieSchema nullableInt = HoodieSchemaConverter.convertToSchema(
+        DataTypes.INT().nullable().getLogicalType());
+    assertEquals(HoodieSchemaType.UNION, nullableInt.getType());
+    assertTrue(nullableInt.isNullable());
+  }
+
+  @Test
+  public void testTemporalTypes() {
+    // Date
+    HoodieSchema dateSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.DATE().notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.DATE, dateSchema.getType());
+
+    // Time
+    HoodieSchema timeSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.TIME(3).notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.TIME, timeSchema.getType());
+    assertEquals(HoodieSchema.TimePrecision.MILLIS, ((HoodieSchema.Time) 
timeSchema).getPrecision());
+
+    // Time micros
+    HoodieSchema timeMicrosSchema = HoodieSchemaConverter.convertToSchema(
+            DataTypes.TIME(6).notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.TIME, timeMicrosSchema.getType());
+    assertEquals(HoodieSchema.TimePrecision.MICROS, ((HoodieSchema.Time) 
timeMicrosSchema).getPrecision());
+
+    // Timestamp millis
+    HoodieSchema timestampMillisSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.TIMESTAMP(3).notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.TIMESTAMP, timestampMillisSchema.getType());
+    assertTrue(timestampMillisSchema instanceof HoodieSchema.Timestamp);
+    assertEquals(HoodieSchema.TimePrecision.MILLIS,
+        ((HoodieSchema.Timestamp) timestampMillisSchema).getPrecision());
+
+    // Timestamp micros
+    HoodieSchema timestampMicrosSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.TIMESTAMP(6).notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.TIMESTAMP, timestampMicrosSchema.getType());
+    assertTrue(timestampMicrosSchema instanceof HoodieSchema.Timestamp);
+    assertEquals(HoodieSchema.TimePrecision.MICROS,
+        ((HoodieSchema.Timestamp) timestampMicrosSchema).getPrecision());
+
+    // Local timestamp millis
+    HoodieSchema localTimestampMillisSchema = 
HoodieSchemaConverter.convertToSchema(
+        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.TIMESTAMP, 
localTimestampMillisSchema.getType());
+
+    // Local timestamp micros
+    HoodieSchema localTimestampMicrosSchema = 
HoodieSchemaConverter.convertToSchema(
+        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull().getLogicalType());
+    assertEquals(HoodieSchemaType.TIMESTAMP, 
localTimestampMicrosSchema.getType());
+  }
+
+  @Test
+  public void testDecimalType() {
+    HoodieSchema decimalSchema = HoodieSchemaConverter.convertToSchema(
+        DataTypes.DECIMAL(10, 2).notNull().getLogicalType(), "test");
+    assertEquals(HoodieSchemaType.DECIMAL, decimalSchema.getType());
+    assertTrue(decimalSchema instanceof HoodieSchema.Decimal);
+
+    HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) decimalSchema;
+    assertEquals(10, decimal.getPrecision());
+    assertEquals(2, decimal.getScale());
+  }
+
+  @Test
+  public void testArrayType() {
+    LogicalType arrayType = 
DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull().getLogicalType();
+    HoodieSchema arraySchema = 
HoodieSchemaConverter.convertToSchema(arrayType);
+
+    assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType());
+    assertEquals(HoodieSchemaType.STRING, 
arraySchema.getElementType().getType());
+  }
+
+  @Test
+  public void testArrayTypeWithNullableElements() {
+    LogicalType arrayType = 
DataTypes.ARRAY(DataTypes.STRING().nullable()).notNull().getLogicalType();
+    HoodieSchema arraySchema = 
HoodieSchemaConverter.convertToSchema(arrayType);
+
+    assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType());
+
+    HoodieSchema elementSchema = arraySchema.getElementType();
+    assertEquals(HoodieSchemaType.UNION, elementSchema.getType());
+    assertTrue(elementSchema.isNullable());
+
+    HoodieSchema actualElementType = elementSchema.getNonNullType();
+    assertEquals(HoodieSchemaType.STRING, actualElementType.getType());
+  }
+
+  @Test
+  public void testMapType() {
+    LogicalType mapType = DataTypes.MAP(
+        DataTypes.STRING().notNull(),
+        DataTypes.INT().notNull()).notNull().getLogicalType();
+    HoodieSchema mapSchema = HoodieSchemaConverter.convertToSchema(mapType);
+
+    assertEquals(HoodieSchemaType.MAP, mapSchema.getType());
+    assertEquals(HoodieSchemaType.INT, mapSchema.getValueType().getType());
+  }
+
+  @Test
+  public void testMapTypeWithNullableValues() {
+    LogicalType mapType = DataTypes.MAP(
+        DataTypes.STRING().notNull(),
+        DataTypes.INT().nullable()).notNull().getLogicalType();
+    HoodieSchema mapSchema = HoodieSchemaConverter.convertToSchema(mapType);
+
+    assertEquals(HoodieSchemaType.MAP, mapSchema.getType());
+
+    HoodieSchema valueSchema = mapSchema.getValueType();
+    assertEquals(HoodieSchemaType.UNION, valueSchema.getType());
+    assertTrue(valueSchema.isNullable());
+
+    HoodieSchema actualValueType = valueSchema.getNonNullType();
+    assertEquals(HoodieSchemaType.INT, actualValueType.getType());
+  }
+
+  @Test
+  public void testRecordType() {
+    LogicalType recordType = DataTypes.ROW(
+        DataTypes.FIELD("id", DataTypes.INT().notNull()),
+        DataTypes.FIELD("name", DataTypes.STRING().notNull()),
+        DataTypes.FIELD("age", DataTypes.INT().nullable())
+    ).notNull().getLogicalType();
+
+    HoodieSchema recordSchema = 
HoodieSchemaConverter.convertToSchema(recordType, "Person");
+
+    assertEquals(HoodieSchemaType.RECORD, recordSchema.getType());
+    assertEquals(3, recordSchema.getFields().size());
+    assertEquals("id", recordSchema.getFields().get(0).name());
+    assertEquals("name", recordSchema.getFields().get(1).name());
+    assertEquals("age", recordSchema.getFields().get(2).name());
+
+    // Verify nullable field
+    assertTrue(recordSchema.getFields().get(2).schema().isNullable());
+  }
+
+  @Test
+  public void testNestedRecordType() {
+    LogicalType nestedRecordType = DataTypes.ROW(
+        DataTypes.FIELD("id", DataTypes.INT().notNull()),
+        DataTypes.FIELD("address", DataTypes.ROW(
+            DataTypes.FIELD("street", DataTypes.STRING().notNull()),
+            DataTypes.FIELD("city", DataTypes.STRING().notNull())
+        ).notNull())
+    ).notNull().getLogicalType();
+
+    HoodieSchema nestedSchema = 
HoodieSchemaConverter.convertToSchema(nestedRecordType, "User");
+
+    assertEquals(HoodieSchemaType.RECORD, nestedSchema.getType());
+    assertEquals(2, nestedSchema.getFields().size());
+
+    HoodieSchema addressSchema = nestedSchema.getFields().get(1).schema();
+    assertEquals(HoodieSchemaType.RECORD, addressSchema.getType());
+    assertEquals(2, addressSchema.getFields().size());
+  }
+
+  @Test
+  public void testCompareWithAvroConversion() {
+    // Test that HoodieSchemaConverter produces the same result as
+    // AvroSchemaConverter + HoodieSchema.fromAvroSchema()
+
+    RowType flinkRowType = (RowType) DataTypes.ROW(
+        DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
+        DataTypes.FIELD("name", DataTypes.STRING().nullable()),
+        DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP(3).notNull()),
+        DataTypes.FIELD("decimal_val", DataTypes.DECIMAL(10, 2).notNull())
+    ).notNull().getLogicalType();
+
+    // Method 1: Direct HoodieSchema conversion
+    HoodieSchema directSchema = 
HoodieSchemaConverter.convertToSchema(flinkRowType, "TestRecord");
+
+    // Method 2: Via Avro conversion
+    HoodieSchema viaAvroSchema = HoodieSchema.fromAvroSchema(
+        AvroSchemaConverter.convertToSchema(flinkRowType, "TestRecord"));
+
+    // Both should produce equivalent schemas
+    assertNotNull(directSchema);
+    assertNotNull(viaAvroSchema);
+    assertEquals(HoodieSchemaType.RECORD, directSchema.getType());
+    assertEquals(HoodieSchemaType.RECORD, viaAvroSchema.getType());
+    assertEquals(4, directSchema.getFields().size());
+    assertEquals(4, viaAvroSchema.getFields().size());
+
+    // Verify field types match
+    for (int i = 0; i < 4; i++) {
+      assertEquals(
+          viaAvroSchema.getFields().get(i).schema().getType(),
+          directSchema.getFields().get(i).schema().getType(),
+          "Field " + i + " type mismatch"
+      );
+    }
+  }
+
+  @Test
+  public void testComplexNestedStructure() {
+    LogicalType complexType = DataTypes.ROW(
+        DataTypes.FIELD("id", DataTypes.STRING().notNull()),
+        DataTypes.FIELD("tags", 
DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull()),
+        DataTypes.FIELD("metadata", DataTypes.MAP(
+            DataTypes.STRING().notNull(),
+            DataTypes.STRING().notNull()).notNull()),
+        DataTypes.FIELD("nested", DataTypes.ROW(
+            DataTypes.FIELD("value", DataTypes.DOUBLE().notNull()),
+            DataTypes.FIELD("items", 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull())
+        ).notNull())
+    ).notNull().getLogicalType();
+
+    HoodieSchema complexSchema = 
HoodieSchemaConverter.convertToSchema(complexType, "ComplexRecord");
+
+    assertNotNull(complexSchema);
+    assertEquals(HoodieSchemaType.RECORD, complexSchema.getType());
+    assertEquals(4, complexSchema.getFields().size());
+
+    // Verify array field
+    assertEquals(HoodieSchemaType.ARRAY, 
complexSchema.getFields().get(1).schema().getType());
+
+    // Verify map field
+    assertEquals(HoodieSchemaType.MAP, 
complexSchema.getFields().get(2).schema().getType());
+
+    // Verify nested record
+    HoodieSchema nestedRecord = complexSchema.getFields().get(3).schema();
+    assertEquals(HoodieSchemaType.RECORD, nestedRecord.getType());
+    assertEquals(2, nestedRecord.getFields().size());
+  }
+
+  @Test
+  public void testNativeConversionMatchesAvroPath() {
+    // Verify native conversion produces same result as Avro path
+    RowType originalRowType = (RowType) DataTypes.ROW(
+        DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
+        DataTypes.FIELD("name", DataTypes.STRING().nullable()),
+        DataTypes.FIELD("age", DataTypes.INT().notNull())
+    ).notNull().getLogicalType();
+
+    HoodieSchema hoodieSchema = 
HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord");
+
+    // Native conversion
+    DataType nativeResult = 
HoodieSchemaConverter.convertToDataType(hoodieSchema);
+
+    // Avro path (for comparison)
+    DataType avroResult = 
AvroSchemaConverter.convertToDataType(hoodieSchema.getAvroSchema());
+
+    assertEquals(avroResult.getLogicalType(), nativeResult.getLogicalType());
+  }
+
+  @Test
+  public void testRoundTripConversion() {
+    RowType originalRowType = (RowType) DataTypes.ROW(
+        DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
+        DataTypes.FIELD("name", DataTypes.STRING().nullable()),
+        DataTypes.FIELD("age", DataTypes.INT().notNull())
+    ).notNull().getLogicalType();
+
+    HoodieSchema hoodieSchema = 
HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord");
+    RowType convertedRowType = 
HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+    assertEquals(originalRowType, convertedRowType);
+  }
+
+  @Test
+  public void testConvertPrimitiveTypesToDataType() {
+    HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+        "test_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("string_col", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("int_col", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("long_col", 
HoodieSchema.create(HoodieSchemaType.LONG)),
+            HoodieSchemaField.of("float_col", 
HoodieSchema.create(HoodieSchemaType.FLOAT)),
+            HoodieSchemaField.of("double_col", 
HoodieSchema.create(HoodieSchemaType.DOUBLE)),
+            HoodieSchemaField.of("boolean_col", 
HoodieSchema.create(HoodieSchemaType.BOOLEAN)),
+            HoodieSchemaField.of("bytes_col", 
HoodieSchema.create(HoodieSchemaType.BYTES))
+        )
+    );
+
+    RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+    assertEquals(7, result.getFieldCount());
+    // Verify each field name
+    assertEquals("string_col", result.getFieldNames().get(0));
+    assertEquals("int_col", result.getFieldNames().get(1));
+    assertEquals("long_col", result.getFieldNames().get(2));
+    assertEquals("float_col", result.getFieldNames().get(3));
+    assertEquals("double_col", result.getFieldNames().get(4));
+    assertEquals("boolean_col", result.getFieldNames().get(5));
+    assertEquals("bytes_col", result.getFieldNames().get(6));
+  }
+
+  @Test
+  public void testConvertNullableTypesToDataType() {
+    HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+        "test_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("nullable_string",
+                
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING))),
+            HoodieSchemaField.of("nullable_int",
+                
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.INT)))
+        )
+    );
+
+    RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+    assertTrue(result.getTypeAt(0).isNullable());
+    assertTrue(result.getTypeAt(1).isNullable());
+    assertEquals("nullable_string", result.getFieldNames().get(0));
+    assertEquals("nullable_int", result.getFieldNames().get(1));
+  }
+
+  @Test
+  public void testConvertTemporalTypesToDataType() {
+    HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+        "test_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("date_col", HoodieSchema.createDate()),
+            HoodieSchemaField.of("time_col", HoodieSchema.createTimeMillis()),
+            HoodieSchemaField.of("timestamp_millis", 
HoodieSchema.createTimestampMillis()),
+            HoodieSchemaField.of("timestamp_micros", 
HoodieSchema.createTimestampMicros()),
+            HoodieSchemaField.of("local_timestamp_millis", 
HoodieSchema.createLocalTimestampMillis()),
+            HoodieSchemaField.of("local_timestamp_micros", 
HoodieSchema.createLocalTimestampMicros())
+        )
+    );
+
+    RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+    assertEquals(6, result.getFieldCount());
+    assertEquals(3, ((TimestampType) result.getTypeAt(2)).getPrecision());
+    assertEquals(6, ((TimestampType) result.getTypeAt(3)).getPrecision());
+  }
+
+  @Test
+  public void testConvertDecimalTypeToDataType() {
+    // Create a FIXED-backed decimal HoodieSchema directly
+    HoodieSchema decimalSchema = HoodieSchema.createDecimal(
+        "decimal_col.fixed",  // name
+        null,                 // namespace
+        null,                 // doc
+        10,                   // precision
+        2,                    // scale
+        5                     // fixedSize (5 bytes for precision 10)
+    );
+
+    // Wrap in a record structure
+    HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+        "test_record",
+        null,
+        null,
+        Arrays.asList(HoodieSchemaField.of("decimal_col", decimalSchema))
+    );
+
+    // Verify the decimal is FIXED-backed (backed by fixed-size byte array)
+    HoodieSchema decimalField = hoodieSchema.getFields().get(0).schema();
+    assertTrue(decimalField instanceof HoodieSchema.Decimal);
+    HoodieSchema.Decimal decimalSchemaTyped = (HoodieSchema.Decimal) 
decimalField;
+    assertTrue(decimalSchemaTyped.isFixed()); // Verify it's FIXED-backed
+    assertEquals(5, decimalSchemaTyped.getFixedSize()); // For precision 10, 
fixed size is 5 bytes
+    assertEquals(10, decimalSchemaTyped.getPrecision());
+    assertEquals(2, decimalSchemaTyped.getScale());
+
+    // Convert to Flink RowType
+    RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+    // Verify conversion
+    assertTrue(result.getTypeAt(0) instanceof DecimalType);
+    DecimalType decimal = (DecimalType) result.getTypeAt(0);
+    assertEquals(10, decimal.getPrecision());
+    assertEquals(2, decimal.getScale());
+  }
+
+  @Test
+  public void testConvertComplexTypesToDataType() {
+    HoodieSchema nestedRecord = HoodieSchema.createRecord(
+        "nested_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("nested_id", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("nested_name", 
HoodieSchema.create(HoodieSchemaType.STRING))
+        )
+    );
+
+    HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+        "test_record",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("array_col",
+                
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))),
+            HoodieSchemaField.of("map_col",
+                
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))),
+            HoodieSchemaField.of("nested_record", nestedRecord)
+        )
+    );
+
+    RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+    assertEquals(3, result.getFieldCount());
+    assertTrue(result.getTypeAt(0) instanceof ArrayType);
+    assertTrue(result.getTypeAt(1) instanceof MapType);
+    assertTrue(result.getTypeAt(2) instanceof RowType);
+  }
+
+  @Test
+  public void testConvertNullSchemaThrowsException() {
+    assertThrows(IllegalArgumentException.class, () -> {
+      HoodieSchemaConverter.convertToRowType(null);
+    });
+  }
+
+  @Test
+  public void testConvertNonRecordSchemaThrowsException() {
+    HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
+    assertThrows(IllegalArgumentException.class, () -> {
+      HoodieSchemaConverter.convertToRowType(stringSchema);
+    });
+  }
+
+  @Test
+  public void testEnumToStringConversion() {
+    HoodieSchema enumSchema = HoodieSchema.createEnum(
+        "Color", null, null, Arrays.asList("RED", "GREEN", "BLUE"));
+
+    DataType dataType = HoodieSchemaConverter.convertToDataType(enumSchema);
+    assertTrue(dataType.getLogicalType() instanceof VarCharType);
+  }
+
+  @Test
+  public void testFixedConversion() {
+    HoodieSchema fixedSchema = HoodieSchema.createFixed("MD5", null, null, 16);
+    DataType dataType = HoodieSchemaConverter.convertToDataType(fixedSchema);
+    assertTrue(dataType.getLogicalType() instanceof VarBinaryType);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 1af098872d21..dd60e867f184 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -143,6 +143,17 @@ public class HoodieSchema implements Serializable {
     return new HoodieSchema.Parser(validateDefaults).parse(jsonSchema);
   }
 
+  /**
+   * Parses an InputStream and returns the corresponding HoodieSchema.
+   *
+   * @param inputStream the InputStream to parse
+   * @return parsed HoodieSchema
+   * @throws HoodieAvroSchemaException if the schema string is invalid
+   */
+  public static HoodieSchema parse(InputStream inputStream) {
+    return new HoodieSchema.Parser().parse(inputStream);
+  }
+
   /**
    * Creates a schema for the specified primitive type.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index a86979bc51fe..8334f0408270 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -18,8 +18,10 @@
 
 package org.apache.hudi.common.schema;
 
+import org.apache.avro.JsonProperties;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -51,6 +53,9 @@ import java.util.stream.Collectors;
  */
 public final class HoodieSchemaUtils {
 
+  public static final HoodieSchema METADATA_FIELD_SCHEMA = 
HoodieSchema.createNullable(HoodieSchemaType.STRING);
+  public static final HoodieSchema RECORD_KEY_SCHEMA = initRecordKeySchema();
+
   // Private constructor to prevent instantiation
   private HoodieSchemaUtils() {
     throw new UnsupportedOperationException("Utility class cannot be 
instantiated");
@@ -157,6 +162,21 @@ public final class HoodieSchemaUtils {
     return HoodieSchema.fromAvroSchema(nullableAvro);
   }
 
+  /**
+   * Create a new schema by force changing all the fields as nullable.
+   * This is equivalent to AvroSchemaUtils.asNullable() but operates on 
HoodieSchema.
+   *
+   * @return a new schema with all the fields updated as nullable
+   * @throws IllegalArgumentException if schema is null
+   */
+  public static HoodieSchema asNullable(HoodieSchema schema) {
+    ValidationUtils.checkArgument(schema != null, "Schema cannot be null");
+
+    // Delegate to AvroSchemaUtils
+    Schema nullableAvro = AvroSchemaUtils.asNullable(schema.toAvroSchema());
+    return HoodieSchema.fromAvroSchema(nullableAvro);
+  }
+
   /**
    * Removes specified fields from a RECORD schema.
    * This is equivalent to HoodieAvroUtils.removeFields() but operates on 
HoodieSchema.
@@ -536,4 +556,21 @@ public final class HoodieSchemaUtils {
     HoodieSchema newSchema = 
createNewSchemaFromFieldsWithReference(foundSchema, 
Collections.singletonList(nestedPart.get()));
     return Option.of(createNewSchemaField(foundField.name(), isUnion ? 
createNullableSchema(newSchema) : newSchema, foundField.doc().orElse(null), 
foundField.defaultVal().orElse(null)));
   }
+
+  private static HoodieSchema initRecordKeySchema() {
+    HoodieSchemaField recordKeyField =
+            createNewSchemaField(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
+    return HoodieSchema.createRecord(
+            "HoodieRecordKey",
+            "",
+            "",
+            false,
+            Collections.singletonList(recordKeyField)
+    );
+  }
+
+  public static HoodieSchema getRecordKeySchema() {
+    return RECORD_KEY_SCHEMA;
+  }
+
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index 25ea4c16f7e1..f68de07cf4ab 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -120,7 +120,8 @@ public class TestHoodieSchema {
     }, "Should throw exception for invalid JSON schema");
 
     assertThrows(IllegalArgumentException.class, () -> {
-      HoodieSchema.parse(null);
+      String invalid = null;
+      HoodieSchema.parse(invalid);
     }, "Should throw exception for null schema string");
 
     assertThrows(IllegalArgumentException.class, () -> {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index be8ececd82e6..52b6e49620d6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
@@ -44,7 +45,6 @@ import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.RuntimeContextUtils;
 
-import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -217,7 +217,8 @@ public class BootstrapOperator
     Option<HoodieInstant> latestCommitTime = 
commitsTimeline.filterCompletedAndCompactionInstants().lastInstant();
 
     if (latestCommitTime.isPresent()) {
-      Schema schema = new 
TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
+      HoodieSchema schema =
+          new 
TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableSchema();
 
       List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
           .getLatestMergedFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.get().requestedTime())
@@ -250,7 +251,7 @@ public class BootstrapOperator
    *
    * @return A record key iterator for the file slice.
    */
-  private ClosableIterator<String> getRecordKeyIterator(FileSlice fileSlice, 
Schema tableSchema) throws IOException {
+  private ClosableIterator<String> getRecordKeyIterator(FileSlice fileSlice, 
HoodieSchema tableSchema) throws IOException {
     FileSlice scanFileSlice = new FileSlice(fileSlice.getPartitionPath(), 
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
     // filter out crushed base file
     fileSlice.getBaseFile().map(f -> isValidFile(f.getPathInfo()) ? f : 
null).ifPresent(scanFileSlice::setBaseFile);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 001617f22a2e..8dbc59475300 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink.clustering;
 
 import org.apache.hudi.adapter.MaskingOutputAdapter;
 import org.apache.hudi.adapter.Utils;
-import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.CloseableConcatenatingIterator;
@@ -32,6 +31,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
@@ -53,13 +53,12 @@ import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.HoodieRowDataParquetReader;
 import org.apache.hudi.table.format.InternalSchemaManager;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.FlinkTaskContextSupplier;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.utils.RuntimeContextUtils;
 
-import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
@@ -107,8 +106,8 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
   private int taskID;
   private transient HoodieWriteConfig writeConfig;
   private transient HoodieFlinkTable<?> table;
-  private transient Schema schema;
-  private transient Schema readerSchema;
+  private transient HoodieSchema schema;
+  private transient HoodieSchema readerSchema;
   private transient HoodieFlinkWriteClient writeClient;
   private transient StreamRecordCollector<ClusteringCommitEvent> collector;
   private transient BinaryRowDataSerializer binarySerializer;
@@ -170,11 +169,11 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
     this.table = writeClient.getHoodieTable();
 
-    this.schema = AvroSchemaConverter.convertToSchema(rowType);
+    this.schema = HoodieSchemaConverter.convertToSchema(rowType);
     // Since there exists discrepancies between flink and spark dealing with 
nullability of primary key field,
     // and there may be some files written by spark, force update schema as 
nullable to make sure clustering
     // scan successfully without schema validating exception.
-    this.readerSchema = AvroSchemaUtils.asNullable(schema);
+    this.readerSchema = HoodieSchemaUtils.asNullable(schema);
 
     this.binarySerializer = new 
BinaryRowDataSerializer(rowType.getFieldCount());
 
@@ -312,8 +311,7 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
         HoodieRowDataParquetReader fileReader = (HoodieRowDataParquetReader) 
fileReaderFactory.getFileReader(
             table.getConfig(), new 
StoragePath(clusteringOp.getDataFilePath()));
 
-        //TODO boundary to revisit in later pr to use HoodieSchema directly
-        return new 
CloseableMappingIterator<>(fileReader.getRecordIterator(HoodieSchema.fromAvroSchema(readerSchema)),
 HoodieRecord::getData);
+        return new 
CloseableMappingIterator<>(fileReader.getRecordIterator(readerSchema), 
HoodieRecord::getData);
       } catch (IOException e) {
         throw new HoodieClusteringException("Error reading input data for " + 
clusteringOp.getDataFilePath()
             + " and " + clusteringOp.getDeltaFilePaths(), e);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index b5c955e6d8dc..a9099b74c407 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.clustering;
 import org.apache.hudi.async.HoodieAsyncTableService;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -32,14 +33,13 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
 import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import com.beust.jcommander.JCommander;
-import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import 
org.apache.flink.client.deployment.application.ApplicationExecutionException;
@@ -319,8 +319,8 @@ public class HoodieFlinkClusteringJob {
       // Mark instant as clustering inflight
       
ClusteringUtils.transitionClusteringOrReplaceRequestedToInflight(instant, 
Option.empty(), table.getActiveTimeline());
 
-      final Schema tableAvroSchema = 
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
-      final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+      final HoodieSchema tableSchema = 
StreamerUtil.getTableSchema(table.getMetaClient(), false);
+      final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
       final RowType rowType = (RowType) rowDataType.getLogicalType();
 
       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 976e12999671..721b3f94e470 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -27,7 +27,7 @@ import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.StreamerUtils;
 
@@ -73,7 +73,7 @@ public class HoodieFlinkStreamer {
     Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
     // Read from kafka source
     RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema())
+        (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
 
     long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 5369e47bb115..37e5bbbfece0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -21,13 +21,14 @@ package org.apache.hudi.table;
 import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
 import org.apache.hudi.adapter.InputFormatSourceFunctionAdapter;
 import org.apache.hudi.adapter.TableFunctionProviderAdapter;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -70,15 +71,14 @@ import 
org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadTableState;
 import org.apache.hudi.table.lookup.HoodieLookupFunction;
 import org.apache.hudi.table.lookup.HoodieLookupTableReader;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.ChangelogModes;
 import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.ExpressionUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.InputFormats;
 import org.apache.hudi.util.SerializableSchema;
 import org.apache.hudi.util.StreamerUtil;
 
-import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -439,8 +439,8 @@ public class HoodieTableSource implements
   }
 
   private InputFormat<RowData, ?> getBatchInputFormat() {
-    final Schema tableAvroSchema = getTableAvroSchema();
-    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final HoodieSchema tableSchema = getTableSchema();
+    final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
     final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
 
@@ -456,7 +456,7 @@ public class HoodieTableSource implements
               LOG.info("No input splits generate for MERGE_ON_READ input 
format. Returning empty collection");
               return InputFormats.EMPTY_INPUT_FORMAT;
             }
-            return mergeOnReadInputFormat(rowType, requiredRowType, 
tableAvroSchema,
+            return mergeOnReadInputFormat(rowType, requiredRowType, 
tableSchema,
                 rowDataType, inputSplits, false);
           case COPY_ON_WRITE:
             return baseFileOnlyInputFormat();
@@ -480,9 +480,9 @@ public class HoodieTableSource implements
           LOG.info("No input splits generated for incremental read. Returning 
empty collection");
           return InputFormats.EMPTY_INPUT_FORMAT;
         } else if (cdcEnabled) {
-          return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, 
rowDataType, result.getInputSplits());
+          return cdcInputFormat(rowType, requiredRowType, tableSchema, 
rowDataType, result.getInputSplits());
         } else {
-          return mergeOnReadInputFormat(rowType, requiredRowType, 
tableAvroSchema,
+          return mergeOnReadInputFormat(rowType, requiredRowType, tableSchema,
               rowDataType, result.getInputSplits(), false);
         }
       default:
@@ -494,8 +494,8 @@ public class HoodieTableSource implements
 
   private InputFormat<RowData, ?> getStreamInputFormat() {
     // if table does not exist or table data does not exist, use schema from 
the DDL
-    Schema tableAvroSchema = (this.metaClient == null || !tableDataExists()) ? 
inferSchemaFromDdl() : getTableAvroSchema();
-    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    HoodieSchema tableSchema = (this.metaClient == null || !tableDataExists()) 
? inferSchemaFromDdl() : getTableSchema();
+    final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
     final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
 
@@ -506,9 +506,9 @@ public class HoodieTableSource implements
         final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.get(FlinkOptions.TABLE_TYPE));
         boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
         if (this.conf.get(FlinkOptions.CDC_ENABLED)) {
-          return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, 
rowDataType, Collections.emptyList());
+          return cdcInputFormat(rowType, requiredRowType, tableSchema, 
rowDataType, Collections.emptyList());
         } else {
-          return mergeOnReadInputFormat(rowType, requiredRowType, 
tableAvroSchema,
+          return mergeOnReadInputFormat(rowType, requiredRowType, tableSchema,
               rowDataType, Collections.emptyList(), emitDelete);
         }
       default:
@@ -530,14 +530,14 @@ public class HoodieTableSource implements
   private MergeOnReadInputFormat cdcInputFormat(
       RowType rowType,
       RowType requiredRowType,
-      Schema tableAvroSchema,
+      HoodieSchema tableSchema,
       DataType rowDataType,
       List<MergeOnReadInputSplit> inputSplits) {
     final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
         rowType,
         requiredRowType,
-        tableAvroSchema.toString(),
-        AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+        tableSchema.toString(),
+        HoodieSchemaConverter.convertToSchema(requiredRowType).toString(),
         inputSplits,
         conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","));
     return CdcInputFormat.builder()
@@ -555,7 +555,7 @@ public class HoodieTableSource implements
   private MergeOnReadInputFormat mergeOnReadInputFormat(
       RowType rowType,
       RowType requiredRowType,
-      Schema tableAvroSchema,
+      HoodieSchema tableAvroSchema,
       DataType rowDataType,
       List<MergeOnReadInputSplit> inputSplits,
       boolean emitDelete) {
@@ -563,7 +563,7 @@ public class HoodieTableSource implements
         rowType,
         requiredRowType,
         tableAvroSchema.toString(),
-        AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+        HoodieSchemaConverter.convertToSchema(requiredRowType).toString(),
         inputSplits,
         conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","));
     return MergeOnReadInputFormat.builder()
@@ -608,9 +608,9 @@ public class HoodieTableSource implements
     );
   }
 
-  private Schema inferSchemaFromDdl() {
-    Schema schema = AvroSchemaConverter.convertToSchema(this.tableRowType);
-    return HoodieAvroUtils.addMetadataFields(schema, 
conf.get(FlinkOptions.CHANGELOG_ENABLED));
+  private HoodieSchema inferSchemaFromDdl() {
+    HoodieSchema schema = 
HoodieSchemaConverter.convertToSchema(this.tableRowType);
+    return HoodieSchemaUtils.addMetadataFields(schema, 
conf.get(FlinkOptions.CHANGELOG_ENABLED));
   }
 
   private FileIndex getOrBuildFileIndex() {
@@ -648,10 +648,10 @@ public class HoodieTableSource implements
   }
 
   @VisibleForTesting
-  public Schema getTableAvroSchema() {
+  public HoodieSchema getTableSchema() {
     try {
       TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
-      return schemaResolver.getTableAvroSchema();
+      return schemaResolver.getTableSchema();
     } catch (Throwable e) {
       // table exists but has no written data
       LOG.warn("Unable to resolve schema from table, using schema from the 
DDL", e);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 22994a4a2808..5d810f88a7e6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -22,6 +22,7 @@ import 
org.apache.hudi.adapter.HiveCatalogConstants.AlterHiveDatabaseOp;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -40,12 +41,11 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.table.HoodieTableFactory;
 import org.apache.hudi.table.format.FilePathUtils;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.CatalogUtils;
 
-import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
@@ -422,7 +422,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     Table hiveTable = translateSparkTable2Flink(tablePath, 
getHiveTable(tablePath));
     String path = hiveTable.getSd().getLocation();
     Map<String, String> parameters = hiveTable.getParameters();
-    Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, 
hiveConf);
+    HoodieSchema latestTableSchema = StreamerUtil.getLatestTableSchema(path, 
hiveConf);
     org.apache.flink.table.api.Schema schema;
     if (latestTableSchema != null) {
       String pkColumnsStr = 
parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
@@ -430,7 +430,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
           ? null : StringUtils.split(pkColumnsStr, ",");
       // if the table is initialized from spark, the write schema is nullable 
for pk columns.
       DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
-          AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
+          HoodieSchemaConverter.convertToDataType(latestTableSchema), 
pkColumns);
       org.apache.flink.table.api.Schema.Builder builder = 
org.apache.flink.table.api.Schema.newBuilder()
           .fromRowDataType(tableDataType);
       String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
@@ -496,7 +496,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
 
   private HoodieTableMetaClient initTableIfNotExists(ObjectPath tablePath, 
CatalogTable catalogTable) {
     Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
-    final String avroSchema = AvroSchemaConverter.convertToSchema(
+    final String avroSchema = HoodieSchemaConverter.convertToSchema(
         DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()),
         
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
     flinkConf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 4c27f54723c4..7df7987070ae 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -108,7 +108,7 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
             .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
             .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, 
Option.empty());
     DataType rowType = 
RowDataAvroQueryContexts.fromAvroSchema(dataSchema.toAvroSchema()).getRowType();
-    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema.toAvroSchema(), 
getSafePredicates(requiredSchema.toAvroSchema()));
+    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema, getSafePredicates(requiredSchema.toAvroSchema()));
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index c2f9ca0177b7..8741e66eeac9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -22,6 +22,8 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.InstantRange;
@@ -34,7 +36,6 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.util.FlinkClientUtil;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
@@ -55,13 +56,13 @@ public class FormatUtils {
 
   public static GenericRecord buildAvroRecordBySchema(
       IndexedRecord record,
-      Schema requiredSchema,
+      HoodieSchema requiredSchema,
       int[] requiredPos,
       GenericRecordBuilder recordBuilder) {
-    List<Schema.Field> requiredFields = requiredSchema.getFields();
+    List<HoodieSchemaField> requiredFields = requiredSchema.getFields();
     assert (requiredFields.size() == requiredPos.length);
     Iterator<Integer> positionIterator = Arrays.stream(requiredPos).iterator();
-    requiredFields.forEach(f -> recordBuilder.set(f, getVal(record, 
positionIterator.next())));
+    requiredFields.forEach(f -> recordBuilder.set(f.getAvroField(), 
getVal(record, positionIterator.next())));
     return recordBuilder.build();
   }
 
@@ -111,8 +112,8 @@ public class FormatUtils {
       HoodieWriteConfig writeConfig,
       InternalSchemaManager internalSchemaManager,
       FileSlice fileSlice,
-      Schema tableSchema,
-      Schema requiredSchema,
+      HoodieSchema tableSchema,
+      HoodieSchema requiredSchema,
       String latestInstant,
       String mergeType,
       boolean emitDelete,
@@ -134,8 +135,8 @@ public class FormatUtils {
         .withHoodieTableMetaClient(metaClient)
         .withLatestCommitTime(latestInstant)
         .withFileSlice(fileSlice)
-        .withDataSchema(tableSchema)
-        .withRequestedSchema(requiredSchema)
+        .withDataSchema(tableSchema.getAvroSchema())
+        .withRequestedSchema(requiredSchema.getAvroSchema())
         
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
         .withProps(typedProps)
         .withShouldUseRecordPosition(false)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index 54c985699de9..4465124c9a89 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -18,12 +18,12 @@
 
 package org.apache.hudi.table.format;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.model.HoodieFlinkRecord;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
@@ -34,13 +34,12 @@ import 
org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.util.AvroSchemaConverter;
 
-import org.apache.avro.Schema;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
@@ -57,7 +56,7 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
   private final HoodieStorage storage;
   private final ParquetUtils parquetUtils;
   private final StoragePath path;
-  private Schema fileSchema;
+  private HoodieSchema fileSchema;
   private DataType fileRowType;
   private final List<ClosableIterator<RowData>> readerIterators = new 
ArrayList<>();
 
@@ -84,15 +83,14 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
 
   @Override
   public ClosableIterator<HoodieRecord<RowData>> 
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) 
throws IOException {
-    //TODO boundary to follow up in later pr
-    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema.getAvroSchema(), Collections.emptyList());
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema, Collections.emptyList());
     readerIterators.add(rowDataItr);
     return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
   }
 
   @Override
   public ClosableIterator<String> getRecordKeyIterator() throws IOException {
-    Schema schema = HoodieAvroUtils.getRecordKeySchema();
+    HoodieSchema schema = HoodieSchemaUtils.getRecordKeySchema();
     ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema, 
Collections.emptyList());
     return new CloseableMappingIterator<>(rowDataItr, rowData -> 
Objects.toString(rowData.getString(0)));
   }
@@ -100,7 +98,7 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
   public ClosableIterator<RowData> getRowDataIterator(
       InternalSchemaManager internalSchemaManager,
       DataType dataType,
-      Schema requestedSchema,
+      HoodieSchema requestedSchema,
       List<Predicate> predicates) throws IOException {
     return RecordIterators.getParquetRecordIterator(storage.getConf(), 
internalSchemaManager, dataType, requestedSchema, path, predicates);
   }
@@ -108,10 +106,9 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
   @Override
   public HoodieSchema getSchema() {
     if (fileSchema == null) {
-      fileSchema = 
AvroSchemaConverter.convertToSchema(getRowType().notNull().getLogicalType());
+      fileSchema = 
HoodieSchemaConverter.convertToSchema(getRowType().notNull().getLogicalType());
     }
-    //TODO to revisit in later pr to use HoodieSchema directly
-    return HoodieSchema.fromAvroSchema(fileSchema);
+    return fileSchema;
   }
 
   public DataType getRowType() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index 120e50e5388f..6a4ed021c2bc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.table.format;
 
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -29,7 +31,6 @@ import org.apache.hudi.storage.inline.InLineFSUtils;
 import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
 import org.apache.hudi.util.RowDataProjection;
 
-import org.apache.avro.Schema;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -62,12 +63,12 @@ public abstract class RecordIterators {
       StorageConfiguration<?> conf,
       InternalSchemaManager internalSchemaManager,
       DataType dataType,
-      Schema requestedSchema,
+      HoodieSchema requestedSchema,
       StoragePath path,
       List<Predicate> predicates) throws IOException {
     List<String> fieldNames = ((RowType) 
dataType.getLogicalType()).getFieldNames();
     List<DataType> fieldTypes = dataType.getChildren();
-    int[] selectedFields = 
requestedSchema.getFields().stream().map(Schema.Field::name)
+    int[] selectedFields = 
requestedSchema.getFields().stream().map(HoodieSchemaField::name)
         .map(fieldNames::indexOf)
         .mapToInt(i -> i)
         .toArray();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 195c636428df..17d6f93773ca 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -18,8 +18,7 @@
 
 package org.apache.hudi.table.format.cdc;
 
-import org.apache.hudi.avro.AvroSchemaCache;
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
 import org.apache.hudi.client.model.HoodieFlinkRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -31,6 +30,7 @@ import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
@@ -69,7 +69,6 @@ import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.RowDataProjection;
 import org.apache.hudi.util.StreamerUtil;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.flink.configuration.Configuration;
@@ -141,9 +140,10 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
   private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit 
split) {
     try {
       // get full schema iterator.
-      final Schema tableSchema = AvroSchemaCache.intern(new 
Schema.Parser().parse(tableState.getAvroSchema()));
+      final HoodieSchema schema = HoodieSchemaCache.intern(
+          HoodieSchema.parse(tableState.getAvroSchema()));
       // before/after images have assumption of snapshot scan, so `emitDelete` 
is set as false
-      return getSplitRowIterator(split, tableSchema, tableSchema, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
+      return getSplitRowIterator(split, schema, schema, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
     } catch (IOException e) {
       throw new HoodieException("Failed to create iterator for split: " + 
split, e);
     }
@@ -215,9 +215,10 @@ public class CdcInputFormat extends MergeOnReadInputFormat 
{
    * @return {@link RowData} iterator for the given split.
    */
   private ClosableIterator<HoodieRecord<RowData>> 
getSplitRecordIterator(MergeOnReadInputSplit split) throws IOException {
-    final Schema tableSchema = AvroSchemaCache.intern(new 
Schema.Parser().parse(tableState.getAvroSchema()));
+    final HoodieSchema schema = HoodieSchemaCache.intern(
+        HoodieSchema.parse(tableState.getAvroSchema()));
     HoodieFileGroupReader<RowData> fileGroupReader =
-        createFileGroupReader(split, tableSchema, tableSchema, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, true);
+        createFileGroupReader(split, schema, schema, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, true);
     return fileGroupReader.getClosableHoodieRecordIterator();
   }
 
@@ -474,7 +475,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
   }
 
   abstract static class BaseImageIterator implements ClosableIterator<RowData> 
{
-    private final Schema requiredSchema;
+    private final HoodieSchema requiredSchema;
     private final int[] requiredPos;
     private final GenericRecordBuilder recordBuilder;
     private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
@@ -494,9 +495,9 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
         MergeOnReadTableState tableState,
         HoodieSchema cdcSchema,
         HoodieCDCFileSplit fileSplit) {
-      this.requiredSchema = new 
Schema.Parser().parse(tableState.getRequiredAvroSchema());
+      this.requiredSchema = 
HoodieSchema.parse(tableState.getRequiredAvroSchema());
       this.requiredPos = getRequiredPos(tableState.getAvroSchema(), 
this.requiredSchema);
-      this.recordBuilder = new GenericRecordBuilder(requiredSchema);
+      this.recordBuilder = new 
GenericRecordBuilder(requiredSchema.getAvroSchema());
       this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
       StoragePath hadoopTablePath = new StoragePath(tablePath);
       HoodieStorage storage = new HoodieHadoopStorage(tablePath, hadoopConf);
@@ -511,9 +512,9 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
       this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, 
cdcSchema.toAvroSchema());
     }
 
-    private int[] getRequiredPos(String tableSchema, Schema required) {
-      Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new 
Schema.Parser().parse(tableSchema));
-      List<String> fields = 
dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+    private int[] getRequiredPos(String tableSchema, HoodieSchema required) {
+      HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableSchema));
+      List<String> fields = 
dataSchema.getFields().stream().map(HoodieSchemaField::name).collect(Collectors.toList());
       return required.getFields().stream()
           .map(f -> fields.indexOf(f.name()))
           .mapToInt(i -> i)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 01b718b0a97a..8a1e8b279822 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,11 +18,12 @@
 
 package org.apache.hudi.table.format.mor;
 
-import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -39,7 +40,6 @@ import org.apache.hudi.table.format.RecordIterators;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
-import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
@@ -187,8 +187,10 @@ public class MergeOnReadInputFormat
             + "hoodie table path: " + split.getTablePath()
             + "flink partition Index: " + split.getSplitNumber()
             + "merge type: " + split.getMergeType());
-    final Schema tableSchema = AvroSchemaCache.intern(new 
Schema.Parser().parse(tableState.getAvroSchema()));
-    final Schema requiredSchema = AvroSchemaCache.intern(new 
Schema.Parser().parse(tableState.getRequiredAvroSchema()));
+    final HoodieSchema tableSchema = HoodieSchemaCache.intern(
+        HoodieSchema.parse(tableState.getAvroSchema()));
+    final HoodieSchema requiredSchema = HoodieSchemaCache.intern(
+        HoodieSchema.parse(tableState.getRequiredAvroSchema()));
     return getSplitRowIterator(split, tableSchema, requiredSchema, mergeType, 
emitDelete);
   }
 
@@ -301,8 +303,8 @@ public class MergeOnReadInputFormat
    */
   protected ClosableIterator<RowData> getSplitRowIterator(
       MergeOnReadInputSplit split,
-      Schema tableSchema,
-      Schema requiredSchema,
+      HoodieSchema tableSchema,
+      HoodieSchema requiredSchema,
       String mergeType,
       boolean emitDelete) throws IOException {
     HoodieFileGroupReader<RowData> fileGroupReader = 
createFileGroupReader(split, tableSchema, requiredSchema, mergeType, 
emitDelete);
@@ -322,8 +324,8 @@ public class MergeOnReadInputFormat
    */
   protected HoodieFileGroupReader<RowData> createFileGroupReader(
       MergeOnReadInputSplit split,
-      Schema tableSchema,
-      Schema requiredSchema,
+      HoodieSchema tableSchema,
+      HoodieSchema requiredSchema,
       String mergeType,
       boolean emitDelete) {
     FileSlice fileSlice = new FileSlice(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 446533f5dbc5..73c7a336909d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -33,7 +33,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.sink.compact.FlinkCompactionConfig;
 import org.apache.hudi.table.HoodieFlinkTable;
 
-import org.apache.avro.Schema;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +75,7 @@ public class CompactionUtil {
    */
   public static void setAvroSchema(Configuration conf, HoodieTableMetaClient 
metaClient) throws Exception {
     TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
-    Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+    HoodieSchema tableAvroSchema = tableSchemaResolver.getTableSchema(false);
     conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
   }
 
@@ -87,7 +87,7 @@ public class CompactionUtil {
    */
   public static void setAvroSchema(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
-    Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+    HoodieSchema tableAvroSchema = tableSchemaResolver.getTableSchema(false);
     writeConfig.setSchema(tableAvroSchema.toString());
   }
 
@@ -131,8 +131,8 @@ public class CompactionUtil {
    */
   public static void inferChangelogMode(Configuration conf, 
HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
-    Schema tableAvroSchema = 
tableSchemaResolver.getTableAvroSchemaFromDataFile();
-    if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != 
null) {
+    HoodieSchema tableAvroSchema = 
HoodieSchema.fromAvroSchema(tableSchemaResolver.getTableAvroSchemaFromDataFile());
+    if 
(tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent()) {
       conf.set(FlinkOptions.CHANGELOG_ENABLED, true);
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java
index c1fbc80982a9..6b8e3e48786f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java
@@ -41,7 +41,7 @@ public final class JsonDeserializationFunction
   public static JsonDeserializationFunction getInstance(Configuration conf) {
     // Read from file source
     RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema())
+        (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
     return getInstance(rowType);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 297e93bd12a5..e83b3852fd13 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -71,7 +71,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
 
-import org.apache.avro.Schema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -609,19 +608,19 @@ public class StreamerUtil {
     return (long) conf.get(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
   }
 
-  public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, 
boolean includeMetadataFields) throws Exception {
+  public static HoodieSchema getTableSchema(HoodieTableMetaClient metaClient, 
boolean includeMetadataFields) throws Exception {
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-    return schemaUtil.getTableAvroSchema(includeMetadataFields);
+    return schemaUtil.getTableSchema(includeMetadataFields);
   }
 
-  public static Schema getLatestTableSchema(String path, 
org.apache.hadoop.conf.Configuration hadoopConf) {
+  public static HoodieSchema getLatestTableSchema(String path, 
org.apache.hadoop.conf.Configuration hadoopConf) {
     if (StringUtils.isNullOrEmpty(path) || !StreamerUtil.tableExists(path, 
hadoopConf)) {
       return null;
     }
 
     try {
       HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, 
hadoopConf);
-      return getTableAvroSchema(metaClient, false);
+      return getTableSchema(metaClient, false);
     } catch (Exception e) {
       LOG.error("Failed to resolve the latest table schema", e);
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamV2Write.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamV2Write.java
index 28d9e81e2307..28e513e74da7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamV2Write.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamV2Write.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.sink.v2.utils.PipelinesV2;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.JsonDeserializationFunction;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.FlinkMiniCluster;
@@ -114,7 +114,7 @@ public class ITTestDataStreamV2Write {
 
     // Read from file source
     RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema())
+            (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
 
     String sourcePath = Objects.requireNonNull(Thread.currentThread()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 0e4cf37b528c..d5bd4f895bc1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -38,8 +38,8 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.catalog.HoodieCatalog;
 import org.apache.hudi.table.catalog.TableOptionProperties;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.HoodiePipeline;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.JsonDeserializationFunction;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.FlinkMiniCluster;
@@ -283,7 +283,7 @@ public class ITTestDataStreamWrite extends TestLogger {
 
     // Read from file source
     RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema())
+        (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
 
     String sourcePath = Objects.requireNonNull(Thread.currentThread()
@@ -333,7 +333,7 @@ public class ITTestDataStreamWrite extends TestLogger {
 
     // Read from file source
     RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema())
+        (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
 
     String sourcePath = Objects.requireNonNull(Thread.currentThread()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
index ed134868c338..d8e67f636d06 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
@@ -27,7 +27,7 @@ import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.utils.Pipelines;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.JsonDeserializationFunction;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.FlinkMiniCluster;
@@ -147,7 +147,7 @@ public class ITTestConsistentBucketStreamWrite extends 
TestLogger {
 
     // Read from file source
     RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema())
+        (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
 
     String sourcePath = Objects.requireNonNull(Thread.currentThread()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 6289f9af27b1..4fbf90e2e421 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -38,16 +39,15 @@ import 
org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
 import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
 import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
 import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestSQL;
 
-import org.apache.avro.Schema;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
@@ -180,8 +180,8 @@ public class ITTestHoodieFlinkClustering {
       HoodieInstant instant = 
INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime.get());
       table.getActiveTimeline().transitionClusterRequestedToInflight(instant, 
Option.empty());
 
-      final Schema tableAvroSchema = 
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
-      final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+      final HoodieSchema tableSchema = 
StreamerUtil.getTableSchema(table.getMetaClient(), false);
+      final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
       final RowType rowType = (RowType) rowDataType.getLogicalType();
 
       DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstantTime.get(), clusteringPlan, conf))
@@ -383,8 +383,8 @@ public class ITTestHoodieFlinkClustering {
       HoodieInstant instant = 
INSTANT_GENERATOR.getClusteringCommitRequestedInstant(firstClusteringInstant.get());
       table.getActiveTimeline().transitionClusterRequestedToInflight(instant, 
Option.empty());
 
-      final Schema tableAvroSchema = 
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
-      final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+      final HoodieSchema tableAvroSchema = 
StreamerUtil.getTableSchema(table.getMetaClient(), false);
+      final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableAvroSchema);
       final RowType rowType = (RowType) rowDataType.getLogicalType();
 
       DataStream<ClusteringCommitEvent> dataStream =
@@ -752,8 +752,8 @@ public class ITTestHoodieFlinkClustering {
       HoodieInstant instant = 
INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime.get());
       table.getActiveTimeline().transitionClusterRequestedToInflight(instant, 
Option.empty());
 
-      final Schema tableAvroSchema = 
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
-      final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+      final HoodieSchema tableAvroSchema = 
StreamerUtil.getTableSchema(table.getMetaClient(), false);
+      final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableAvroSchema);
       final RowType rowType = (RowType) rowDataType.getLogicalType();
 
       DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(clusteringInstantTime.get(), clusteringPlan, conf))
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java
index 4b5ce9dc0816..70f8cacfd992 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java
@@ -22,13 +22,12 @@ import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.collection.BitCaskDiskMap;
 import org.apache.hudi.common.util.collection.RocksDbDiskMap;
 
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
 import org.junit.jupiter.api.BeforeEach;
@@ -72,11 +71,16 @@ public class TestCustomSerDe extends 
HoodieCommonTestHarness {
   }
 
   private static HoodieRecord createAvroRecordWithDecimalOrderingField() {
-    Schema precombineFieldSchema = LogicalTypes.decimal(20, 0)
-        .addToSchema(Schema.createFixed("fixed", null, 
"record.precombineField", 9));
+    HoodieSchema decimalSchema = HoodieSchema.createDecimal(
+        "fixed",                    // name
+        "record.precombineField",   // namespace
+        null,                       // doc
+        20,                         // precision
+        0,                          // scale
+        9                           // fixedSize in bytes
+    );
     byte[] decimalFieldBytes = new byte[] {0, 0, 0, 1, -122, -16, -116, -90, 
-32};
-    GenericFixed genericFixed = new GenericData.Fixed(precombineFieldSchema, 
decimalFieldBytes);
-
+    GenericFixed genericFixed = new 
GenericData.Fixed(decimalSchema.getAvroSchema(), decimalFieldBytes);
     // nullifying the record attribute in EventTimeAvroPayload here as it is 
not required in the test
     return new HoodieAvroRecord<>(new HoodieKey("recordKey", "partitionPath"),
         new EventTimeAvroPayload(null, (Comparable) genericFixed));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index 03f01dc192b8..56b0017a8bce 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -28,7 +28,7 @@ import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 
@@ -103,7 +103,7 @@ public class BucketStreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<
     this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
environment);
     this.gateway = new MockOperatorEventGateway();
     this.conf = conf;
-    this.rowType = (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema()).getLogicalType();
+    this.rowType = (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
     // one function
     this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index 63cdc353d5a1..7ecd3c079bdc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -32,7 +32,7 @@ import org.apache.hudi.sink.bulk.sort.SortOperator;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -94,7 +94,7 @@ public class BulkInsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
environment);
     this.gateway = new MockOperatorEventGateway();
     this.conf = conf;
-    this.rowType = (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema()).getLogicalType();
+    this.rowType = (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
     this.rowTypeWithFileId = 
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
     this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 41fe1f4568ce..62d24d346d06 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -26,7 +26,7 @@ import org.apache.hudi.sink.append.AppendWriteFunctions;
 import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -84,7 +84,7 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.gateway = new MockOperatorEventGateway();
     this.subtaskGateway = new MockSubtaskGateway();
     this.conf = conf;
-    this.rowType = (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema()).getLogicalType();
+    this.rowType = (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
     // one function
     this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 5ef8d90bb1c0..d5f1a31385fd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -32,7 +32,7 @@ import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 
@@ -122,7 +122,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
environment);
     this.gateway = new MockOperatorEventGateway();
     this.conf = conf;
-    this.rowType = (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).toAvroSchema()).getLogicalType();
+    this.rowType = (RowType) 
HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
     // one function
     this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index c644d70e2163..1f5b41bc0f91 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source;
 
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -31,7 +32,6 @@ import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestUtils;
 
-import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -247,18 +247,18 @@ public class TestStreamReadOperator {
 
     // This input format is used to opening the emitted split.
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
-    final Schema tableAvroSchema;
+    final HoodieSchema tableSchema;
     try {
-      tableAvroSchema = schemaResolver.getTableAvroSchema();
+      tableSchema = schemaResolver.getTableSchema();
     } catch (Exception e) {
       throw new HoodieException("Get table avro schema error", e);
     }
-    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableSchema.getAvroSchema());
     final RowType rowType = (RowType) rowDataType.getLogicalType();
     final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
         rowType,
         TestConfigurations.ROW_TYPE,
-        tableAvroSchema.toString(),
+        tableSchema.toString(),
         
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
         Collections.emptyList(),
         new String[0]);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 601625242991..e217bb5ab9bc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -22,6 +22,9 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CompactionUtils;
@@ -41,8 +44,6 @@ import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.utils.FlinkMiniCluster;
 
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -63,6 +64,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -245,35 +247,44 @@ public class ITTestSchemaEvolution {
         doCompact(conf);
       }
 
-      Schema intType = 
SchemaBuilder.unionOf().nullType().and().intType().endUnion();
-      Schema longType = 
SchemaBuilder.unionOf().nullType().and().longType().endUnion();
-      Schema doubleType = 
SchemaBuilder.unionOf().nullType().and().doubleType().endUnion();
-      Schema stringType = 
SchemaBuilder.unionOf().nullType().and().stringType().endUnion();
-      Schema structType = 
SchemaBuilder.builder().record("new_row_col").fields()
-              .name("f0").type(longType).noDefault()
-              .name("f1").type(stringType).noDefault().endRecord();
-      Schema arrayType = 
Schema.createUnion(SchemaBuilder.builder().array().items(stringType), 
SchemaBuilder.builder().nullType());
-      Schema mapType = 
Schema.createUnion(SchemaBuilder.builder().map().values(stringType), 
SchemaBuilder.builder().nullType());
-
-      writeClient.addColumn("salary", doubleType, null, "name", AFTER);
+      // Create nullable primitive types using HoodieSchema
+      HoodieSchema intType = HoodieSchema.createNullable(HoodieSchemaType.INT);
+      HoodieSchema longType = 
HoodieSchema.createNullable(HoodieSchemaType.LONG);
+      HoodieSchema doubleType = 
HoodieSchema.createNullable(HoodieSchemaType.DOUBLE);
+      HoodieSchema stringType = 
HoodieSchema.createNullable(HoodieSchemaType.STRING);
+
+      // Create struct type with fields
+      List<HoodieSchemaField> structFields = Arrays.asList(
+          HoodieSchemaField.of("f0", longType, null, HoodieSchema.NULL_VALUE),
+          HoodieSchemaField.of("f1", stringType, null, HoodieSchema.NULL_VALUE)
+      );
+      HoodieSchema structType = HoodieSchema.createRecord("new_row_col", null, 
null, structFields);
+
+      // Create nullable array type
+      HoodieSchema arrayType = 
HoodieSchema.createNullable(HoodieSchema.createArray(stringType));
+
+      // Create nullable map type
+      HoodieSchema mapType = 
HoodieSchema.createNullable(HoodieSchema.createMap(stringType));
+
+      writeClient.addColumn("salary", doubleType.getAvroSchema(), null, 
"name", AFTER);
       writeClient.deleteColumns("gender");
       writeClient.renameColumn("name", "first_name");
       writeClient.updateColumnType("age", Types.StringType.get());
-      writeClient.addColumn("last_name", stringType, "empty allowed", 
"salary", BEFORE);
+      writeClient.addColumn("last_name", stringType.getAvroSchema(), "empty 
allowed", "salary", BEFORE);
       writeClient.reOrderColPosition("age", "first_name", BEFORE);
       // add a field in the middle of the `f_struct` and `f_row_map` columns
-      writeClient.addColumn("f_struct.f2", intType, "add field in middle of 
struct", "f_struct.f0", AFTER);
-      writeClient.addColumn("f_row_map.value.f2", intType, "add field in 
middle of struct", "f_row_map.value.f0", AFTER);
+      writeClient.addColumn("f_struct.f2", intType.getAvroSchema(), "add field 
in middle of struct", "f_struct.f0", AFTER);
+      writeClient.addColumn("f_row_map.value.f2", intType.getAvroSchema(), 
"add field in middle of struct", "f_row_map.value.f0", AFTER);
       // add a field at the end of `f_struct` and `f_row_map` column
-      writeClient.addColumn("f_struct.f3", stringType);
-      writeClient.addColumn("f_row_map.value.f3", stringType);
+      writeClient.addColumn("f_struct.f3", stringType.getAvroSchema());
+      writeClient.addColumn("f_row_map.value.f3", stringType.getAvroSchema());
 
       // delete and add a field with the same name
       // reads should not return previously inserted datum of dropped field of 
the same name
       writeClient.deleteColumns("f_struct.drop_add");
-      writeClient.addColumn("f_struct.drop_add", doubleType);
+      writeClient.addColumn("f_struct.drop_add", doubleType.getAvroSchema());
       writeClient.deleteColumns("f_row_map.value.drop_add");
-      writeClient.addColumn("f_row_map.value.drop_add", doubleType);
+      writeClient.addColumn("f_row_map.value.drop_add", 
doubleType.getAvroSchema());
 
       // perform comprehensive evolution on complex types (struct, array, map) 
by promoting its primitive types
       writeClient.updateColumnType("f_struct.change_type", 
Types.LongType.get());
@@ -284,9 +295,9 @@ public class ITTestSchemaEvolution {
       writeClient.updateColumnType("f_map.value", Types.DoubleType.get());
 
       // perform comprehensive schema evolution on table by adding complex 
typed columns
-      writeClient.addColumn("new_row_col", structType);
-      writeClient.addColumn("new_array_col", arrayType);
-      writeClient.addColumn("new_map_col", mapType);
+      writeClient.addColumn("new_row_col", structType.getAvroSchema());
+      writeClient.addColumn("new_array_col", arrayType.getAvroSchema());
+      writeClient.addColumn("new_map_col", mapType.getAvroSchema());
 
       writeClient.reOrderColPosition("partition", "new_map_col", AFTER);
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 56b82578bd4d..a17264707ba7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -47,12 +48,11 @@ import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.format.FlinkRowDataReaderContext;
 import org.apache.hudi.table.format.InternalSchemaManager;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.RowDataAvroQueryContexts;
 import org.apache.hudi.utils.TestData;
 
-import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -146,13 +146,13 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     writeConfigs.forEach((key, value) -> conf.setString(key, value));
     conf.set(FlinkOptions.ORDERING_FIELDS, 
ConfigUtils.getOrderingFieldsStrDuringWrite(writeConfigs));
     conf.set(FlinkOptions.OPERATION, operation);
-    Schema localSchema = getRecordAvroSchema(schemaStr);
+    HoodieSchema localSchema = getRecordSchema(schemaStr);
     conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, localSchema.toString());
     AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
-        
RowDataAvroQueryContexts.fromAvroSchema(localSchema).getAvroToRowDataConverter();
+        
RowDataAvroQueryContexts.fromAvroSchema(localSchema.getAvroSchema()).getAvroToRowDataConverter();
     List<RowData> rowDataList = recordList.stream().map(record -> {
       try {
-        return (RowData) 
avroConverter.convert(record.toIndexedRecord(localSchema, 
CollectionUtils.emptyProps()).get().getData());
+        return (RowData) 
avroConverter.convert(record.toIndexedRecord(localSchema.getAvroSchema(), 
CollectionUtils.emptyProps()).get().getData());
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -324,8 +324,8 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     }
   }
 
-  private static Schema getRecordAvroSchema(String schemaStr) {
-    Schema recordSchema = new Schema.Parser().parse(schemaStr);
-    return 
AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType());
+  private static HoodieSchema getRecordSchema(String schemaStr) {
+    HoodieSchema recordSchema = new HoodieSchema.Parser().parse(schemaStr);
+    return 
HoodieSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.getAvroSchema()).getRowType().getLogicalType());
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index c2c36bec4fd1..87515ac6d267 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.source.ExpressionPredicates;
@@ -32,7 +33,6 @@ import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
-import org.apache.avro.Schema;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.configuration.Configuration;
@@ -145,8 +145,8 @@ public class TestHoodieTableSource {
   void testGetTableAvroSchema() {
     HoodieTableSource tableSource = getEmptyStreamingSource();
     assertNull(tableSource.getMetaClient(), "Streaming source with empty table 
path is allowed");
-    final String schemaFields = 
tableSource.getTableAvroSchema().getFields().stream()
-        .map(Schema.Field::name)
+    final String schemaFields = 
tableSource.getTableSchema().getFields().stream()
+        .map(HoodieSchemaField::name)
         .collect(Collectors.joining(","));
     final String expected = "_hoodie_commit_time,"
         + "_hoodie_commit_seqno,"
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
index e98968845858..751f63d6de75 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
@@ -56,6 +56,7 @@ class TestFlinkRowDataReaderContext {
           HoodieSchemaField.of("name", 
HoodieSchema.create(HoodieSchemaType.STRING)),
           HoodieSchemaField.of("active", 
HoodieSchema.create(HoodieSchemaType.BOOLEAN))
       ));
+
   private FlinkRowDataReaderContext readerContext;
 
   @BeforeEach
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
index 74b82eee1846..afe75e061566 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
@@ -19,10 +19,10 @@
 package org.apache.hudi.utils;
 
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.util.AvroSchemaConverter;
 
-import org.apache.avro.Schema;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 import org.junit.jupiter.api.Test;
@@ -37,8 +37,8 @@ public class TestAvroSchemaConverter {
 
   @Test
   void testUnionSchemaWithMultipleRecordTypes() {
-    Schema schema = HoodieMetadataRecord.SCHEMA$;
-    DataType dataType = AvroSchemaConverter.convertToDataType(schema);
+    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$);
+    DataType dataType = 
AvroSchemaConverter.convertToDataType(schema.getAvroSchema());
     int pos = 
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
     final String expected = "ROW<"
         + "`fileName` STRING, "
@@ -62,7 +62,7 @@ public class TestAvroSchemaConverter {
         DataTypes.FIELD("f_localtimestamp_micros", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
     );
     // convert to avro schema
-    Schema schema = 
AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
+    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(dataType.getLogicalType()));
     final String expectedSchema = ""
         + "[ \"null\", {\n"
         + "  \"type\" : \"record\",\n"
@@ -85,7 +85,7 @@ public class TestAvroSchemaConverter {
         + "} ]";
     assertThat(schema.toString(true), is(expectedSchema));
     // convert it back
-    DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema);
+    DataType convertedDataType = 
AvroSchemaConverter.convertToDataType(schema.getAvroSchema());
     final String expectedDataType = "ROW<"
         + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
         + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 6b5f79c6872b..3ed961b8d5bf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -969,6 +970,7 @@ public class TestData {
     HoodieTableMetaClient metaClient = createMetaClient(basePath);
     HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, 
HoodieFlinkEngineContext.DEFAULT, metaClient);
     Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+    HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema);
 
     String latestInstant = 
metaClient.getActiveTimeline().filterCompletedInstants()
         .lastInstant().map(HoodieInstant::requestedTime).orElse(null);
@@ -983,7 +985,7 @@ public class TestData {
       List<String> readBuffer = new ArrayList<>();
       List<FileSlice> fileSlices = 
table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(),
 latestInstant).collect(Collectors.toList());
       for (FileSlice fileSlice : fileSlices) {
-        try (ClosableIterator<RowData> rowIterator = 
getRecordIterator(fileSlice, schema, metaClient, config)) {
+        try (ClosableIterator<RowData> rowIterator = 
getRecordIterator(fileSlice, hoodieSchema, metaClient, config)) {
           while (rowIterator.hasNext()) {
             RowData rowData = rowIterator.next();
             readBuffer.add(filterOutVariables(schema, rowData));
@@ -1014,7 +1016,7 @@ public class TestData {
 
   private static ClosableIterator<RowData> getRecordIterator(
       FileSlice fileSlice,
-      Schema tableSchema,
+      HoodieSchema tableSchema,
       HoodieTableMetaClient metaClient,
       HoodieWriteConfig writeConfig) throws IOException {
     HoodieFileGroupReader<RowData> fileGroupReader =

Reply via email to