This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5032c39ebb4e feat(schema): Migrate clustering operations to use 
HoodieSchema (#17691)
5032c39ebb4e is described below

commit 5032c39ebb4eacfb4215b268f7011ed28ae587b1
Author: Tim Brown <[email protected]>
AuthorDate: Fri Dec 26 15:34:29 2025 -0500

    feat(schema): Migrate clustering operations to use HoodieSchema (#17691)
---
 .../cluster/strategy/ClusteringExecutionStrategy.java      |  3 +--
 .../hudi/table/action/commit/BaseCommitActionExecutor.java |  6 +++---
 .../clustering/run/strategy/JavaExecutionStrategy.java     | 14 +++++++-------
 .../run/strategy/JavaSortAndSizeExecutionStrategy.java     |  4 ++--
 .../bulkinsert/JavaCustomColumnsSortPartitioner.java       |  7 +++----
 .../bulkinsert/TestJavaBulkInsertInternalPartitioner.java  |  2 +-
 .../run/strategy/MultipleSparkJobExecutionStrategy.java    |  2 +-
 .../SingleSparkJobConsistentHashingExecutionStrategy.java  |  3 +--
 .../run/strategy/SingleSparkJobExecutionStrategy.java      | 11 ++++-------
 .../SparkBinaryCopyClusteringExecutionStrategy.java        | 14 +++-----------
 .../main/java/org/apache/hudi/avro/HoodieAvroUtils.java    |  5 +++--
 .../java/org/apache/hudi/common/schema/HoodieSchema.java   |  1 +
 .../common/table/read/FileGroupReaderSchemaHandler.java    |  3 +--
 .../main/java/org/apache/hudi/common/util/CommitUtils.java |  4 ++--
 .../main/java/org/apache/hudi/common/util/SortUtils.java   |  8 +++-----
 .../internal/schema/convert/InternalSchemaConverter.java   |  2 +-
 .../java/org/apache/hudi/avro/TestHoodieAvroUtils.java     |  2 +-
 .../table/read/TestFileGroupReaderSchemaHandler.java       |  4 ++--
 .../common/table/read/TestHoodieFileGroupReaderBase.java   | 10 +++++-----
 .../table/read/TestParquetRowIndexBasedSchemaHandler.java  |  4 ++--
 .../java/org/apache/hudi/common/util/TestSortUtils.java    |  8 ++++----
 .../main/java/org/apache/hudi/utilities/UtilHelpers.java   |  3 +--
 .../java/org/apache/hudi/utilities/sources/InputBatch.java |  4 +---
 .../sources/helpers/CloudObjectsSelectorCommon.java        |  3 +--
 .../org/apache/hudi/utilities/streamer/StreamSync.java     |  6 +++---
 .../org/apache/hudi/utilities/DummySchemaProvider.java     |  3 +--
 .../org/apache/hudi/utilities/streamer/TestStreamSync.java |  2 +-
 27 files changed, 59 insertions(+), 79 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index d6f957b50092..bc7fec18a564 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -48,7 +48,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -88,7 +87,7 @@ public abstract class ClusteringExecutionStrategy<T, I, K, O> 
implements Seriali
    * file groups created is bounded by numOutputGroups.
    * Note that commit is not done as part of strategy. commit is callers 
responsibility.
    */
-  public abstract HoodieWriteMetadata<O> performClustering(final 
HoodieClusteringPlan clusteringPlan, final Schema schema, final String 
instantTime);
+  public abstract HoodieWriteMetadata<O> performClustering(final 
HoodieClusteringPlan clusteringPlan, final HoodieSchema schema, final String 
instantTime);
 
   protected ClosableIterator<HoodieRecord<T>> 
getRecordIterator(ReaderContextFactory<T> readerContextFactory, 
ClusteringOperation operation, String instantTime, long maxMemory) {
     TypedProperties props = getReaderProperties(maxMemory);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index ae117c002da1..ff9fcb866692 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -61,7 +62,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -291,9 +291,9 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
     ClusteringUtils.transitionClusteringOrReplaceRequestedToInflight(instant, 
Option.empty(), table.getActiveTimeline());
     table.getMetaClient().reloadActiveTimeline();
 
-    Option<Schema> schema;
+    Option<HoodieSchema> schema;
     try {
-      schema = new 
TableSchemaResolver(table.getMetaClient()).getTableAvroSchemaIfPresent(false);
+      schema = new 
TableSchemaResolver(table.getMetaClient()).getTableSchemaIfPresent(false);
     } catch (Exception ex) {
       throw new HoodieSchemaException(ex);
     }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index c8b2b76a5952..f0a0cf1c47df 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -19,7 +19,6 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
@@ -32,6 +31,8 @@ import org.apache.hudi.common.model.ClusteringOperation;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -44,7 +45,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -68,7 +68,7 @@ public abstract class JavaExecutionStrategy<T>
 
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(
-      HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
+      HoodieClusteringPlan clusteringPlan, HoodieSchema schema, String 
instantTime) {
     // execute clustering for each group and collect WriteStatus
     List<WriteStatus> writeStatusList = new ArrayList<>();
     clusteringPlan.getInputGroups().forEach(
@@ -97,7 +97,7 @@ public abstract class JavaExecutionStrategy<T>
    */
   public abstract List<WriteStatus> performClusteringWithRecordList(
       final List<HoodieRecord<T>> inputRecords, final int numOutputGroups, 
final String instantTime,
-      final Map<String, String> strategyParams, final Schema schema,
+      final Map<String, String> strategyParams, final HoodieSchema schema,
       final List<HoodieFileGroupId> fileGroupIdList, final boolean 
preserveHoodieMetadata);
 
   /**
@@ -107,11 +107,11 @@ public abstract class JavaExecutionStrategy<T>
    * @param schema         Schema of the data including metadata fields.
    * @return partitioner for the java engine
    */
-  protected BulkInsertPartitioner<List<HoodieRecord<T>>> 
getPartitioner(Map<String, String> strategyParams, Schema schema) {
+  protected BulkInsertPartitioner<List<HoodieRecord<T>>> 
getPartitioner(Map<String, String> strategyParams, HoodieSchema schema) {
     if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
       return new JavaCustomColumnsSortPartitioner(
           strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
-          HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
+          HoodieSchemaUtils.addMetadataFields(schema), getWriteConfig());
     } else {
       return 
JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
     }
@@ -124,7 +124,7 @@ public abstract class JavaExecutionStrategy<T>
       HoodieClusteringGroup clusteringGroup, Map<String, String> 
strategyParams,
       boolean preserveHoodieMetadata, String instantTime) {
     List<HoodieRecord<T>> inputRecords = readRecordsForGroup(clusteringGroup, 
instantTime);
-    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
+    HoodieSchema readerSchema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(getWriteConfig().getSchema()));
     List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
         .map(info -> new HoodieFileGroupId(info.getPartitionPath(), 
info.getFileId()))
         .collect(Collectors.toList());
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
index ffc815fd48f2..22d2b4eb53a6 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
@@ -25,13 +25,13 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.util.List;
 import java.util.Map;
@@ -54,7 +54,7 @@ public class JavaSortAndSizeExecutionStrategy<T>
   @Override
   public List<WriteStatus> performClusteringWithRecordList(
       final List<HoodieRecord<T>> inputRecords, final int numOutputGroups,
-      final String instantTime, final Map<String, String> strategyParams, 
final Schema schema,
+      final String instantTime, final Map<String, String> strategyParams, 
final HoodieSchema schema,
       final List<HoodieFileGroupId> fileGroupIdList, final boolean 
preserveHoodieMetadata) {
     log.info("Starting clustering for a group, parallelism: {} commit: {}", 
numOutputGroups, instantTime);
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index 54f0c94ff069..1faf0ad45d75 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -20,13 +20,12 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.SortUtils;
 import org.apache.hudi.common.util.collection.FlatLists;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
-import org.apache.avro.Schema;
-
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -41,11 +40,11 @@ public class JavaCustomColumnsSortPartitioner<T>
     implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
 
   private final String[] sortColumnNames;
-  private final Schema schema;
+  private final HoodieSchema schema;
   private final boolean consistentLogicalTimestampEnabled;
   private final boolean suffixRecordKey;
 
-  public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
HoodieWriteConfig config) {
+  public JavaCustomColumnsSortPartitioner(String[] columnNames, HoodieSchema 
schema, HoodieWriteConfig config) {
     this.sortColumnNames = columnNames;
     this.schema = schema;
     this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
index ff8909b85ddc..2dff037f4472 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
@@ -70,7 +70,7 @@ public class TestJavaBulkInsertInternalPartitioner extends 
HoodieJavaClientTestH
     cfg.setValue(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME, 
"partition_path");
     
cfg.setValue(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED,
 "false");
     testBulkInsertInternalPartitioner(
-        new JavaCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, cfg),
+        new JavaCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.HOODIE_SCHEMA, cfg),
         records, true, generatePartitionNumRecords(records), 
Option.of(columnComparator));
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 69d50c1367ca..2c409a50bf1b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -101,7 +101,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
   }
 
   @Override
-  public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final 
HoodieClusteringPlan clusteringPlan, final Schema schema, final String 
instantTime) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final 
HoodieClusteringPlan clusteringPlan, final HoodieSchema schema, final String 
instantTime) {
     JavaSparkContext engineContext = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
     boolean shouldPreserveMetadata = 
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
     ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
index c753d83d3462..5dccf0637b9f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyConcatenatingIterator;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -83,7 +82,7 @@ public class 
SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS
 
   @Override
   protected List<WriteStatus> 
performClusteringForGroup(ReaderContextFactory<T> readerContextFactory, 
ClusteringGroupInfo clusteringGroup, Map<String, String> strategyParams,
-                                                        boolean 
preserveHoodieMetadata, SerializableSchema schema, TaskContextSupplier 
taskContextSupplier, String instantTime) {
+                                                        boolean 
preserveHoodieMetadata, HoodieSchema schema, TaskContextSupplier 
taskContextSupplier, String instantTime) {
     // deal with split / merge operations
     ValidationUtils.checkArgument(clusteringGroup.getNumOutputGroups() >= 1, 
"Number of output groups should be at least 1");
     if (clusteringGroup.getNumOutputGroups() == 1) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 17ec3ccfd2e0..e4ebf13f2b0f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy;
 
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.ReaderContextFactory;
@@ -28,14 +27,13 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.ClusteringGroupInfo;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 
-import org.apache.avro.Schema;
-
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -52,16 +50,15 @@ public abstract class SingleSparkJobExecutionStrategy<T>
   }
 
   @Override
-  public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final 
HoodieClusteringPlan clusteringPlan, final Schema schema, final String 
instantTime) {
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final 
HoodieClusteringPlan clusteringPlan, final HoodieSchema schema, final String 
instantTime) {
     final TaskContextSupplier taskContextSupplier = 
getEngineContext().getTaskContextSupplier();
-    final SerializableSchema serializableSchema = new 
SerializableSchema(schema);
     final List<ClusteringGroupInfo> clusteringGroupInfos = 
clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList());
 
     ReaderContextFactory<T> readerContextFactory = 
getEngineContext().getReaderContextFactory(getHoodieTable().getMetaClient());
     HoodieData<WriteStatus> writeStatus = 
getEngineContext().parallelize(clusteringGroupInfos).map(group -> {
       return performClusteringForGroup(readerContextFactory, group, 
clusteringPlan.getStrategy().getStrategyParams(),
           
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
-          serializableSchema, taskContextSupplier, instantTime);
+          schema, taskContextSupplier, instantTime);
     }).flatMap(List::iterator);
     HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new 
HoodieWriteMetadata<>();
     writeMetadata.setWriteStatuses(writeStatus);
@@ -72,7 +69,7 @@ public abstract class SingleSparkJobExecutionStrategy<T>
    * Submit a task to execute clustering for the group.
    */
   protected abstract List<WriteStatus> 
performClusteringForGroup(ReaderContextFactory<T> readerContextFactory, 
ClusteringGroupInfo clusteringGroup, Map<String, String> strategyParams,
-                                                                 boolean 
preserveHoodieMetadata, SerializableSchema schema,
+                                                                 boolean 
preserveHoodieMetadata, HoodieSchema schema,
                                                                  
TaskContextSupplier taskContextSupplier, String instantTime);
 
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkBinaryCopyClusteringExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkBinaryCopyClusteringExecutionStrategy.java
index e3b09b48fd1c..f68579e580d7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkBinaryCopyClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkBinaryCopyClusteringExecutionStrategy.java
@@ -22,13 +22,13 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ClusteringGroupInfo;
 import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
@@ -40,7 +40,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -77,7 +76,7 @@ public class SparkBinaryCopyClusteringExecutionStrategy<T> 
extends SparkSortAndS
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(
       HoodieClusteringPlan clusteringPlan,
-      Schema schema,
+      HoodieSchema schema,
       String instantTime) {
 
     List<ClusteringGroupInfo> clusteringGroupInfos = 
clusteringPlan.getInputGroups()
@@ -95,8 +94,6 @@ public class SparkBinaryCopyClusteringExecutionStrategy<T> 
extends SparkSortAndS
 
     JavaSparkContext engineContext = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
     TaskContextSupplier taskContextSupplier = 
getEngineContext().getTaskContextSupplier();
-    SerializableSchema serializableSchema = new SerializableSchema(schema);
-    boolean shouldPreserveMetadata = 
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
     JavaRDD<ClusteringGroupInfo> groupInfoJavaRDD = 
engineContext.parallelize(clusteringGroupInfos, clusteringGroupInfos.size());
     log.info("number of partitions for clustering " + 
groupInfoJavaRDD.getNumPartitions());
     JavaRDD<WriteStatus> writeStatusRDD = groupInfoJavaRDD
@@ -106,9 +103,6 @@ public class SparkBinaryCopyClusteringExecutionStrategy<T> 
extends SparkSortAndS
               .flatMap(clusteringOp ->
                   runClusteringForGroup(
                       clusteringOp,
-                      clusteringPlan.getStrategy().getStrategyParams(),
-                      shouldPreserveMetadata,
-                      serializableSchema,
                       taskContextSupplier,
                       instantTime))
               .iterator();
@@ -122,9 +116,7 @@ public class SparkBinaryCopyClusteringExecutionStrategy<T> 
extends SparkSortAndS
   /**
    * Submit job to execute clustering for the group.
    */
-  private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo 
clusteringOps, Map<String, String> strategyParams,
-                                                    boolean 
preserveHoodieMetadata, SerializableSchema schema,
-                                                    TaskContextSupplier 
taskContextSupplier, String instantTime) {
+  private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo 
clusteringOps, TaskContextSupplier taskContextSupplier, String instantTime) {
     List<WriteStatus> statuses = new ArrayList<>();
     List<HoodieFileGroupId> inputFileIds = clusteringOps.getOperations()
         .stream()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 46b98f9ca795..0dce463eef0c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.avro;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.DateTimeUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
@@ -980,11 +981,11 @@ public class HoodieAvroUtils {
    */
   public static Object[] 
getSortColumnValuesWithPartitionPathAndRecordKey(HoodieRecord record,
                                                                           
String[] columns,
-                                                                          
Schema schema,
+                                                                          
HoodieSchema schema,
                                                                           
boolean suffixRecordKey,
                                                                           
boolean consistentLogicalTimestampEnabled) {
     try {
-      GenericRecord genericRecord = (GenericRecord) 
record.toIndexedRecord(schema, PROPERTIES).get().getData();
+      GenericRecord genericRecord = (GenericRecord) 
record.toIndexedRecord(schema.toAvroSchema(), PROPERTIES).get().getData();
       int numColumns = columns.length;
       Object[] values = new Object[columns.length + 1 + (suffixRecordKey ? 1 : 
0)];
       values[0] = record.getPartitionPath();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 2da62bb2924b..3ab2059a18b0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -85,6 +85,7 @@ public class HoodieSchema implements Serializable {
    * This provides compatibility with Avro's JsonProperties while maintaining 
Hudi's API.
    */
   public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
+  public static final HoodieSchema NULL_SCHEMA = 
HoodieSchema.create(HoodieSchemaType.NULL);
   private static final long serialVersionUID = 1L;
   private Schema avroSchema;
   private HoodieSchemaType type;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 09ad49be147f..723bd2ddeea2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -44,7 +44,6 @@ import org.apache.hudi.storage.StoragePath;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.avro.Schema;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -293,7 +292,7 @@ public class FileGroupReaderSchemaHandler<T> {
   }
 
   /**
-   * Get {@link Schema.Field} from {@link Schema} by field name.
+   * Get {@link HoodieSchemaField} from {@link HoodieSchema} by field name.
    */
   private static HoodieSchemaField getField(HoodieSchema schema, String 
fieldName) {
     Option<HoodieSchemaField> foundFieldOpt = findNestedField(schema, 
fieldName);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index e75e0a0b728a..af551aab0a4e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -31,7 +32,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +49,7 @@ import java.util.stream.Collectors;
 public class CommitUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(CommitUtils.class);
-  private static final String NULL_SCHEMA_STR = 
Schema.create(Schema.Type.NULL).toString();
+  private static final String NULL_SCHEMA_STR = 
HoodieSchema.NULL_SCHEMA.toString();
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
index c043f7a2fef0..1558cbd40be2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
@@ -23,8 +23,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.collection.FlatLists;
 
-import org.apache.avro.Schema;
-
 import java.util.function.Function;
 
 /**
@@ -58,12 +56,12 @@ public class SortUtils {
   public static FlatLists.ComparableList<Comparable<HoodieRecord>> 
getComparableSortColumns(
       HoodieRecord record,
       String[] sortColumnNames,
-      Schema schema,
+      HoodieSchema schema,
       boolean suffixRecordKey,
       boolean consistentLogicalTimestampEnabled
   ) {
     if (record.getRecordType() == HoodieRecord.HoodieRecordType.SPARK) {
-      Object[] columnValues = record.getColumnValues(schema, sortColumnNames, 
consistentLogicalTimestampEnabled);
+      Object[] columnValues = record.getColumnValues(schema.toAvroSchema(), 
sortColumnNames, consistentLogicalTimestampEnabled);
       if (suffixRecordKey) {
         return FlatLists.ofComparableArray(
             prependPartitionPathAndSuffixRecordKey(record.getPartitionPath(), 
record.getRecordKey(), columnValues));
@@ -106,7 +104,7 @@ public class SortUtils {
     } else if (record.getRecordType() == HoodieRecord.HoodieRecordType.AVRO) {
       return FlatLists.ofComparableArray(wrapUTF8StringFunc.apply(
           HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(
-              record, sortColumnNames, schema.toAvroSchema(), suffixRecordKey, 
consistentLogicalTimestampEnabled
+              record, sortColumnNames, schema, suffixRecordKey, 
consistentLogicalTimestampEnabled
           )));
     }
     throw new IllegalArgumentException("Invalid recordType" + 
record.getRecordType());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
index d055e16c58ce..d89b7f197685 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
@@ -149,7 +149,7 @@ public class InternalSchemaConverter {
    */
   public static HoodieSchema fixNullOrdering(HoodieSchema schema) {
     if (schema == null) {
-      return HoodieSchema.create(HoodieSchemaType.NULL);
+      return HoodieSchema.NULL_SCHEMA;
     } else if (schema.getType() == HoodieSchemaType.NULL) {
       return schema;
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index ab57ebb4bea5..2d460af7e9fc 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -969,7 +969,7 @@ public class TestHoodieAvroUtils {
     HoodieAvroRecord avroRecord = new HoodieAvroRecord(new 
HoodieKey("record1", "partition1"), avroPayload);
 
     String[] userSortColumns = new String[] {"non_pii_col", "timestamp"};
-    Object[] sortColumnValues = 
HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(avroRecord, 
userSortColumns, Schema.parse(EXAMPLE_SCHEMA), suffixRecordKey, true);
+    Object[] sortColumnValues = 
HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(avroRecord, 
userSortColumns, HoodieSchema.parse(EXAMPLE_SCHEMA), suffixRecordKey, true);
     if (suffixRecordKey) {
       assertArrayEquals(new Object[] {"partition1", "val1", 3.5, "record1"}, 
sortColumnValues);
     } else {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
index 0f6c274c0f7d..e2a2b7c34c70 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.InternalSchemaCache;
@@ -41,7 +42,6 @@ import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -96,7 +96,7 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
     assertTrue(readerContext.getNeedsBootstrapMerge());
     HoodieSchema expectedRequiredSchema = 
generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", 
"rider");
     assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema());
-    Pair<List<Schema.Field>, List<Schema.Field>> bootstrapFields = 
schemaHandler.getBootstrapRequiredFields();
+    Pair<List<HoodieSchemaField>, List<HoodieSchemaField>> bootstrapFields = 
schemaHandler.getBootstrapRequiredFields();
     assertEquals(Collections.singletonList(getField("_hoodie_record_key")), 
bootstrapFields.getLeft());
     assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), 
getField("rider")), bootstrapFields.getRight());
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index b1ff1656157d..2101a17e26df 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -712,11 +712,11 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
                                                    List<HoodieRecord> 
expectedHoodieUnmergedRecords,
                                                    String[] orderingFields) 
throws Exception {
     HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, tablePath);
-    Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
-    expectedHoodieRecords = 
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient, 
avroSchema);
-    expectedHoodieUnmergedRecords = 
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords, 
metaClient, avroSchema);
-    List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = 
convertHoodieRecords(expectedHoodieRecords, avroSchema, orderingFields);
-    List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = 
convertHoodieRecords(expectedHoodieUnmergedRecords, avroSchema, orderingFields);
+    HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema();
+    expectedHoodieRecords = 
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient, 
schema.toAvroSchema());
+    expectedHoodieUnmergedRecords = 
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords, 
metaClient, schema.toAvroSchema());
+    List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = 
convertHoodieRecords(expectedHoodieRecords, schema.toAvroSchema(), 
orderingFields);
+    List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = 
convertHoodieRecords(expectedHoodieUnmergedRecords, schema.toAvroSchema(), 
orderingFields);
     validateOutputFromFileGroupReaderWithExistingRecords(
         storageConf, tablePath, containsBaseFile, expectedLogFileNum, 
recordMergeMode,
         expectedRecords, expectedUnmergedRecords);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
index a00546531364..f6cd3426290e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
@@ -23,10 +23,10 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
-import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -55,7 +55,7 @@ public class TestParquetRowIndexBasedSchemaHandler extends 
SchemaHandlerTestBase
     //meta cols must go first in the required schema
     HoodieSchema expectedRequiredSchema = 
generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", 
"rider");
     assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema());
-    Pair<List<Schema.Field>, List<Schema.Field>> bootstrapFields = 
schemaHandler.getBootstrapRequiredFields();
+    Pair<List<HoodieSchemaField>, List<HoodieSchemaField>> bootstrapFields = 
schemaHandler.getBootstrapRequiredFields();
     assertEquals(Arrays.asList(getField("_hoodie_record_key"), 
getPositionalMergeField()), bootstrapFields.getLeft());
     assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), 
getField("rider"), getPositionalMergeField()), bootstrapFields.getRight());
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
index 38df25837a62..b1a44d978529 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
@@ -24,9 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.RewriteAvroPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.collection.FlatLists;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.junit.jupiter.api.Assertions;
@@ -63,8 +63,8 @@ public class TestSortUtils {
   @ParameterizedTest
   @MethodSource("getArguments")
   void testGetComparableSortColumnsAvroRecord(HoodieRecordType recordType, 
boolean suffixRecordKey) {
-    Schema schema = new Schema.Parser().parse(SCHEMA);
-    GenericRecord genericRecord = new GenericData.Record(schema);
+    HoodieSchema schema = HoodieSchema.parse(SCHEMA);
+    GenericRecord genericRecord = new 
GenericData.Record(schema.toAvroSchema());
     genericRecord.put("non_pii_col", "val1");
     genericRecord.put("pii_col", "val2");
     genericRecord.put("timestamp", 3.5);
@@ -77,7 +77,7 @@ public class TestSortUtils {
       record = new TestSparkRecord(new HoodieKey("record1", "partition1"), 
payload);
     }
     String[] userSortColumns = new String[] {"non_pii_col", "timestamp"};
-    FlatLists.ComparableList<Comparable<HoodieRecord>> comparableList = 
SortUtils.getComparableSortColumns(record, userSortColumns, 
Schema.parse(SCHEMA), suffixRecordKey, true);
+    FlatLists.ComparableList<Comparable<HoodieRecord>> comparableList = 
SortUtils.getComparableSortColumns(record, userSortColumns, 
HoodieSchema.parse(SCHEMA), suffixRecordKey, true);
     Object[] expectedSortColumnValues;
     if (suffixRecordKey) {
       expectedSortColumnValues = new Object[] {"partition1", "val1", 3.5, 
"record1"};
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index e330b821cc51..1ee952d9d763 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -68,7 +68,6 @@ import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
 import 
org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor;
-import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.Source;
 import 
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
 import 
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
@@ -225,7 +224,7 @@ public class UtilHelpers {
   }
 
   public static StructType getSourceSchema(SchemaProvider schemaProvider) {
-    if (schemaProvider != null && schemaProvider.getSourceHoodieSchema() != 
null && schemaProvider.getSourceHoodieSchema() != InputBatch.NULL_SCHEMA) {
+    if (schemaProvider != null && schemaProvider.getSourceHoodieSchema() != 
null && schemaProvider.getSourceHoodieSchema() != HoodieSchema.NULL_SCHEMA) {
       return 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schemaProvider.getSourceHoodieSchema());
     }
     return null;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index 00e5fee1ad9e..f07c09b5e8be 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
@@ -31,7 +30,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 
 public class InputBatch<T> {
 
-  public static final HoodieSchema NULL_SCHEMA = 
HoodieSchema.create(HoodieSchemaType.NULL);
   private final Option<T> batch;
   private final Checkpoint checkpointForNextBatch;
   private final SchemaProvider schemaProvider;
@@ -90,7 +88,7 @@ public class InputBatch<T> {
 
     @Override
     public HoodieSchema getSourceHoodieSchema() {
-      return NULL_SCHEMA;
+      return HoodieSchema.NULL_SCHEMA;
     }
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 2e08a078daf0..6beb1e7bfb42 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -34,7 +34,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.utilities.config.CloudSourceConfig;
 import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.sources.InputBatch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.conf.Configuration;
@@ -290,7 +289,7 @@ public class CloudObjectsSelectorCommon {
     StructType rowSchema = null;
     if (schemaProviderOption.isPresent()) {
       HoodieSchema sourceSchema = 
schemaProviderOption.get().getSourceHoodieSchema();
-      if (sourceSchema != null && 
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
+      if (sourceSchema != null && 
!sourceSchema.equals(HoodieSchema.NULL_SCHEMA)) {
         rowSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema);
         if (isCoalesceRequired(properties, sourceSchema)) {
           reader = reader.schema(addAliasesToRowSchema(sourceSchema, 
rowSchema));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 2995081d82d3..cf2bc551f7c8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -692,7 +692,7 @@ public class StreamSync implements Serializable, Closeable {
 
       checkpoint = dataAndCheckpoint.getCheckpointForNextBatch();
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetHoodieSchema() != null
-          && this.userProvidedSchemaProvider.getTargetHoodieSchema() != 
InputBatch.NULL_SCHEMA) {
+          && this.userProvidedSchemaProvider.getTargetHoodieSchema() != 
HoodieSchema.NULL_SCHEMA) {
         // Let's deduce the schema provider for writer side first!
         schemaProvider = 
getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetHoodieSchema(),
 this.userProvidedSchemaProvider, metaClient);
         boolean useRowWriter = 
canUseRowWriter(schemaProvider.getTargetHoodieSchema());
@@ -826,7 +826,7 @@ public class StreamSync implements Serializable, Closeable {
     }
     hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props));
     hoodieConfig.setValue("path", cfg.targetBasePath);
-    return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema != 
InputBatch.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(),
+    return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema != 
HoodieSchema.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(),
         hoodieConfig, cfg.targetBasePath, cfg.targetTableName);
   }
 
@@ -1222,7 +1222,7 @@ public class StreamSync implements Serializable, 
Closeable {
     HoodieSchema newWriteSchema = targetSchema;
     try {
       // check if targetSchema is equal to NULL schema
-      HoodieSchema nullSchema = InputBatch.NULL_SCHEMA;
+      HoodieSchema nullSchema = HoodieSchema.NULL_SCHEMA;
       if (targetSchema == null || 
(HoodieSchemaCompatibility.areSchemasCompatible(targetSchema, nullSchema) && 
HoodieSchemaCompatibility.areSchemasCompatible(nullSchema, targetSchema))) {
         // target schema is null. fetch schema from commit metadata and use it
         int totalCompleted = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java
index f7cce0d3413a..5e1f8ccfb482 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -32,6 +31,6 @@ public class DummySchemaProvider extends SchemaProvider {
 
   @Override
   public HoodieSchema getSourceHoodieSchema() {
-    return HoodieSchema.create(HoodieSchemaType.NULL);
+    return HoodieSchema.NULL_SCHEMA;
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
index 95d11e307fd3..26ba7599b451 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
@@ -233,7 +233,7 @@ public class TestStreamSync extends 
SparkClientFunctionalTestHarness {
   private SchemaProvider getSchemaProvider(String name, boolean 
isNullTargetSchema) {
     SchemaProvider schemaProvider = mock(SchemaProvider.class);
     HoodieSchema sourceSchema = mock(HoodieSchema.class);
-    HoodieSchema targetSchema = isNullTargetSchema ? InputBatch.NULL_SCHEMA : 
mock(HoodieSchema.class);
+    HoodieSchema targetSchema = isNullTargetSchema ? HoodieSchema.NULL_SCHEMA 
: mock(HoodieSchema.class);
     when(schemaProvider.getSourceHoodieSchema()).thenReturn(sourceSchema);
     when(schemaProvider.getTargetHoodieSchema()).thenReturn(targetSchema);
     when(sourceSchema.toString()).thenReturn(name + "SourceSchema");

Reply via email to