This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e8b1ddd708b [HUDI-6329] Adjust the partitioner automatically for flink 
consistent hashing index (#9087)
e8b1ddd708b is described below

commit e8b1ddd708bc2ba99144f92d7533c7200f12509f
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Jul 5 11:09:25 2023 +0800

    [HUDI-6329] Adjust the partitioner automatically for flink consistent 
hashing index (#9087)
    
    * partitioner would detect new completed resize plan in #snapshotState
    * disable scheduling resize plan for insert write pipelines with consistent 
bucket index
---
 ...sistentHashingBucketClusteringPlanStrategy.java |   4 +-
 .../action/cluster/strategy/UpdateStrategy.java    |   4 +-
 .../util/ConsistentHashingUpdateStrategyUtils.java | 107 +++++++++++++++
 ...arkConsistentBucketDuplicateUpdateStrategy.java |  71 +---------
 .../apache/hudi/configuration/OptionsResolver.java |  26 +++-
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  34 +++--
 .../sink/bucket/BucketStreamWriteFunction.java     |   2 +-
 .../sink/bucket/BucketStreamWriteOperator.java     |   5 +-
 .../bucket/ConsistentBucketAssignFunction.java     |  30 ++++-
 .../ConsistentBucketStreamWriteFunction.java       |  83 ++++++++++++
 .../FlinkConsistentBucketUpdateStrategy.java       | 150 +++++++++++++++++++++
 .../java/org/apache/hudi/sink/utils/Pipelines.java |   2 +-
 .../java/org/apache/hudi/util/ClusteringUtil.java  |   5 +-
 .../org/apache/hudi/util/FlinkWriteClients.java    |   8 +-
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |  40 ++++++
 .../bucket/ITTestConsistentBucketStreamWrite.java  |  23 +++-
 .../utils/BucketStreamWriteFunctionWrapper.java    |  18 ++-
 ...ConsistentBucketStreamWriteFunctionWrapper.java |  81 +++++++++++
 .../apache/hudi/sink/utils/ScalaCollector.java}    |  32 +++--
 .../sink/utils/StreamWriteFunctionWrapper.java     |  22 ---
 .../test/java/org/apache/hudi/utils/TestData.java  |   7 +-
 21 files changed, 611 insertions(+), 143 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
index 59f9fcb81d1..49ab5f181ad 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
@@ -85,7 +85,7 @@ public abstract class 
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
     HoodieTimeline timeline = 
getHoodieTable().getActiveTimeline().getDeltaCommitTimeline().filterInflightsAndRequested();
     if (!timeline.empty()) {
       LOG.warn("When using consistent bucket, clustering cannot be scheduled 
async if there are concurrent writers. "
-          + "Writer instant: " + timeline.getInstants());
+          + "Writer instant: {}.", timeline.getInstants());
       return false;
     }
     return true;
@@ -161,7 +161,7 @@ public abstract class 
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
     TableFileSystemView fileSystemView = getHoodieTable().getFileSystemView();
     boolean isPartitionInClustering = 
fileSystemView.getFileGroupsInPendingClustering().anyMatch(p -> 
p.getLeft().getPartitionPath().equals(partition));
     if (isPartitionInClustering) {
-      LOG.info("Partition: " + partition + " is already in clustering, skip");
+      LOG.info("Partition {} is already in clustering, skip.", partition);
       return Stream.empty();
     }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
index 4463f7887bb..1c61db4b572 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
@@ -32,8 +32,8 @@ import java.util.Set;
 public abstract class UpdateStrategy<T, I> implements Serializable {
 
   protected final transient HoodieEngineContext engineContext;
-  protected final HoodieTable table;
-  protected final Set<HoodieFileGroupId> fileGroupsInPendingClustering;
+  protected HoodieTable table;
+  protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
 
   public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table, 
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
     this.engineContext = engineContext;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/util/ConsistentHashingUpdateStrategyUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/util/ConsistentHashingUpdateStrategyUtils.java
new file mode 100644
index 00000000000..f8351d2fa93
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/util/ConsistentHashingUpdateStrategyUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.util;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Utility class for update strategy of table with consistent hash bucket 
index.
+ */
+public class ConsistentHashingUpdateStrategyUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConsistentHashingUpdateStrategyUtils.class);
+
+  /**
+   * Construct identifier for the given partitions that are under concurrent 
resizing (i.e., clustering).
+   * @return map from partition to pair<instant, identifier>, where instant is 
the clustering instant.
+   */
+  public static Map<String, Pair<String, ConsistentBucketIdentifier>> 
constructPartitionToIdentifier(Set<String> partitions, HoodieTable table) {
+    // Read all pending/ongoing clustering plans
+    List<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPairs =
+        
table.getMetaClient().getActiveTimeline().filterInflightsAndRequested().filter(instant
 -> 
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).getInstantsAsStream()
+            .map(instant -> 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant))
+            .flatMap(o -> o.isPresent() ? Stream.of(o.get()) : Stream.empty())
+            .collect(Collectors.toList());
+
+    // Construct child node for each partition & build the bucket identifier
+    Map<String, HoodieConsistentHashingMetadata> partitionToHashingMeta = new 
HashMap<>();
+    Map<String, String> partitionToInstant = new HashMap<>();
+    for (Pair<HoodieInstant, HoodieClusteringPlan> pair : instantPlanPairs) {
+      String instant = pair.getLeft().getTimestamp();
+      HoodieClusteringPlan plan = pair.getRight();
+      extractHashingMetadataFromClusteringPlan(instant, plan, table, 
partitions, partitionToHashingMeta, partitionToInstant);
+    }
+    return partitionToHashingMeta.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> 
Pair.of(partitionToInstant.get(e.getKey()), new 
ConsistentBucketIdentifier(e.getValue()))));
+  }
+
+  private static void extractHashingMetadataFromClusteringPlan(String instant, 
HoodieClusteringPlan plan, HoodieTable table,
+      final Set<String> recordPartitions, Map<String, 
HoodieConsistentHashingMetadata> partitionToHashingMeta, Map<String, String> 
partitionToInstant) {
+    for (HoodieClusteringGroup group : plan.getInputGroups()) {
+      Map<String, String> groupMeta = group.getExtraMetadata();
+      String p = 
groupMeta.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+      ValidationUtils.checkState(p != null, "Clustering plan does not has 
partition info, plan: " + plan);
+      // Skip unrelated clustering group
+      if (!recordPartitions.contains(p)) {
+        return;
+      }
+
+      String preInstant = partitionToInstant.putIfAbsent(p, instant);
+      ValidationUtils.checkState(preInstant == null || 
preInstant.equals(instant), "Find a partition: " + p + " with two clustering 
instants");
+      if (!partitionToHashingMeta.containsKey(p)) {
+        Option<HoodieConsistentHashingMetadata> metadataOption = 
ConsistentBucketIndexUtils.loadMetadata(table, p);
+        ValidationUtils.checkState(metadataOption.isPresent(), "Failed to load 
consistent hashing metadata for partition: " + p);
+        partitionToHashingMeta.put(p, metadataOption.get());
+      }
+
+      try {
+        String nodeJson = 
group.getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
+        List<ConsistentHashingNode> nodes = 
ConsistentHashingNode.fromJsonString(nodeJson);
+        partitionToHashingMeta.get(p).getChildrenNodes().addAll(nodes);
+      } catch (Exception e) {
+        LOG.error("Failed to parse child nodes in clustering plan.", e);
+        throw new HoodieException("Failed to parse child nodes in clustering 
plan, partition: " + p + ", cluster group: " + group, e);
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 99af0a14819..3aecda8664a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -18,42 +18,28 @@
 
 package org.apache.hudi.client.clustering.update.strategy;
 
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
-import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
 import org.apache.hudi.table.HoodieTable;
-import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
 import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import 
org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
@@ -65,8 +51,6 @@ import static 
org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
  */
 public class SparkConsistentBucketDuplicateUpdateStrategy<T extends 
HoodieRecordPayload<T>> extends UpdateStrategy<T, HoodieData<HoodieRecord<T>>> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(SparkConsistentBucketDuplicateUpdateStrategy.class);
-
   public SparkConsistentBucketDuplicateUpdateStrategy(HoodieEngineContext 
engineContext, HoodieTable table, Set<HoodieFileGroupId> 
fileGroupsInPendingClustering) {
     super(engineContext, table, fileGroupsInPendingClustering);
   }
@@ -86,64 +70,21 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T 
extends HoodieRecord
       return Pair.of(taggedRecordsRDD, Collections.emptySet());
     }
 
-    // Read all pending/ongoing clustering plans
-    List<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPairs =
-        
table.getMetaClient().getActiveTimeline().filterInflightsAndRequested().filter(instant
 -> 
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).getInstantsAsStream()
-            .map(instant -> 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant))
-            .flatMap(o -> o.isPresent() ? Stream.of(o.get()) : Stream.empty())
-            .collect(Collectors.toList());
-
     // Construct child node for each partition & build the bucket identifier
     final Set<String> partitions = new 
HashSet<>(filteredRecordsRDD.map(HoodieRecord::getPartitionPath).distinct().collectAsList());
-    Map<String, HoodieConsistentHashingMetadata> partitionToHashingMeta = new 
HashMap<>();
-    Map<String, String> partitionToInstant = new HashMap<>();
-    for (Pair<HoodieInstant, HoodieClusteringPlan> pair : instantPlanPairs) {
-      String instant = pair.getLeft().getTimestamp();
-      HoodieClusteringPlan plan = pair.getRight();
-      extractHashingMetadataFromClusteringPlan(instant, plan, partitions, 
partitionToHashingMeta, partitionToInstant);
-    }
-    Map<String, ConsistentBucketIdentifier> partitionToIdentifier = 
partitionToHashingMeta.entrySet().stream()
-        .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
ConsistentBucketIdentifier(e.getValue())));
+    Map<String, Pair<String, ConsistentBucketIdentifier>> 
partitionToIdentifier =
+        
ConsistentHashingUpdateStrategyUtils.constructPartitionToIdentifier(partitions, 
table);
 
     // Produce records tagged with new record location
     List<String> indexKeyFields = 
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
     HoodieData<HoodieRecord<T>> redirectedRecordsRDD = 
filteredRecordsRDD.map(r -> {
-      ConsistentHashingNode node = 
partitionToIdentifier.get(r.getPartitionPath()).getBucket(r.getKey(), 
indexKeyFields);
+      Pair<String, ConsistentBucketIdentifier> identifierPair = 
partitionToIdentifier.get(r.getPartitionPath());
+      ConsistentHashingNode node = 
identifierPair.getValue().getBucket(r.getKey(), indexKeyFields);
       return tagAsNewRecordIfNeeded(new HoodieAvroRecord(r.getKey(), 
r.getData(), r.getOperation()),
-          Option.of(new 
HoodieRecordLocation(partitionToInstant.get(r.getPartitionPath()), 
FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
+          Option.ofNullable(new HoodieRecordLocation(identifierPair.getKey(), 
FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
     });
 
     // Return combined iterator (the original and records with new location)
     return Pair.of(taggedRecordsRDD.union(redirectedRecordsRDD), 
Collections.emptySet());
   }
-
-  private void extractHashingMetadataFromClusteringPlan(String instant, 
HoodieClusteringPlan plan, final Set<String> recordPartitions,
-                                                        Map<String, 
HoodieConsistentHashingMetadata> partitionToHashingMeta, Map<String, String> 
partitionToInstant) {
-    for (HoodieClusteringGroup group : plan.getInputGroups()) {
-      Map<String, String> groupMeta = group.getExtraMetadata();
-      String p = 
groupMeta.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
-      checkState(p != null, "Clustering plan does not has partition info, 
plan: " + plan);
-      // Skip unrelated clustering group
-      if (!recordPartitions.contains(p)) {
-        return;
-      }
-
-      String preInstant = partitionToInstant.putIfAbsent(p, instant);
-      checkState(preInstant == null || preInstant.equals(instant), "Find a 
partition: " + p + " with two clustering instants");
-      if (!partitionToHashingMeta.containsKey(p)) {
-        Option<HoodieConsistentHashingMetadata> metadataOption = 
ConsistentBucketIndexUtils.loadMetadata(table, p);
-        checkState(metadataOption.isPresent(), "Failed to load consistent 
hashing metadata for partition: " + p);
-        partitionToHashingMeta.put(p, metadataOption.get());
-      }
-
-      try {
-        String nodeJson = 
group.getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
-        List<ConsistentHashingNode> nodes = 
ConsistentHashingNode.fromJsonString(nodeJson);
-        partitionToHashingMeta.get(p).getChildrenNodes().addAll(nodes);
-      } catch (Exception e) {
-        LOG.error("Failed to parse child nodes in clustering plan", e);
-        throw new HoodieException("Failed to parse child nodes in clustering 
plan, partition: " + p + ", cluster group: " + group, e);
-      }
-    }
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index e7a6901b0b2..b68dcbd0698 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.configuration;
 
+import 
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
 import 
org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
@@ -140,10 +141,21 @@ public class OptionsResolver {
     return HoodieIndex.BucketIndexEngineType.valueOf(bucketEngineType);
   }
 
+  /**
+   * Returns whether the table index is consistent bucket index.
+   */
   public static boolean isConsistentHashingBucketIndexType(Configuration conf) 
{
     return isBucketIndexType(conf) && 
getBucketEngineType(conf).equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING);
   }
 
+  /**
+   * Returns the default plan strategy class.
+   */
+  public static String getDefaultPlanStrategyClassName(Configuration conf) {
+    return OptionsResolver.isConsistentHashingBucketIndexType(conf) ? 
FlinkConsistentBucketClusteringPlanStrategy.class.getName() :
+        FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.defaultValue();
+  }
+
   /**
    * Returns whether the source should emit changelog.
    *
@@ -190,7 +202,19 @@ public class OptionsResolver {
    * @param conf The flink configuration.
    */
   public static boolean needsScheduleClustering(Configuration conf) {
-    return isInsertOperation(conf) && 
conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED);
+    if (!conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) {
+      return false;
+    }
+    WriteOperationType operationType = 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+    if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
+      // Write pipelines for table with consistent bucket index would detect 
whether clustering service occurs,
+      // and automatically adjust the partitioner and write function if 
clustering service happens.
+      // So it could handle UPSERT.
+      // But it could not handle INSERT case, because insert write would not 
take index into consideration currently.
+      return operationType == WriteOperationType.UPSERT;
+    } else {
+      return operationType == WriteOperationType.INSERT;
+    }
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 5087c814030..24a594d0266 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -103,7 +103,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
    */
   private transient Map<String, DataBucket> buckets;
 
-  private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> 
writeFunction;
+  protected transient BiFunction<List<HoodieRecord>, String, 
List<WriteStatus>> writeFunction;
 
   private transient HoodieRecordMerger recordMerger;
 
@@ -246,7 +246,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
   /**
    * Data bucket.
    */
-  private static class DataBucket {
+  protected static class DataBucket {
     private final List<DataItem> records;
     private final BufferSizeDetector detector;
     private final String partitionPath;
@@ -430,12 +430,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
 
     List<HoodieRecord> records = bucket.writeBuffer();
     ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has 
no buffering records");
-    if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
-      records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
-          .deduplicateRecords(records, null, -1, 
this.writeClient.getConfig().getSchema(), 
this.writeClient.getConfig().getProps(), recordMerger);
-    }
-    bucket.preWrite(records);
-    final List<WriteStatus> writeStatus = new 
ArrayList<>(writeFunction.apply(records, instant));
+    records = deduplicateRecordsIfNeeded(records);
+    final List<WriteStatus> writeStatus = writeBucket(instant, bucket, 
records);
     records.clear();
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
@@ -466,12 +462,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
           .forEach(bucket -> {
             List<HoodieRecord> records = bucket.writeBuffer();
             if (records.size() > 0) {
-              if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
-                records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
-                    .deduplicateRecords(records, null, -1, 
this.writeClient.getConfig().getSchema(), 
this.writeClient.getConfig().getProps(), recordMerger);
-              }
-              bucket.preWrite(records);
-              writeStatus.addAll(writeFunction.apply(records, currentInstant));
+              records = deduplicateRecordsIfNeeded(records);
+              writeStatus.addAll(writeBucket(currentInstant, bucket, records));
               records.clear();
               bucket.reset();
             }
@@ -496,4 +488,18 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
   }
+
+  protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, 
List<HoodieRecord> records) {
+    bucket.preWrite(records);
+    return writeFunction.apply(records, instant);
+  }
+
+  private List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord> 
records) {
+    if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
+      return FlinkWriteHelper.newInstance()
+          .deduplicateRecords(records, null, -1, 
this.writeClient.getConfig().getSchema(), 
this.writeClient.getConfig().getProps(), recordMerger);
+    } else {
+      return records;
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 5a7889d327e..63b9c4b3742 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -40,7 +40,7 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * A stream write function with bucket hash index.
+ * A stream write function with simple bucket hash index.
  *
  * <p>The task holds a fresh new local index: {(partition + bucket number) 
&rarr fileId} mapping, this index
  * is used for deciding whether the incoming records in an UPDATE or INSERT.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
index a48ea44ddc4..3c836bdf8ad 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.sink.bucket;
 
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.common.AbstractWriteOperator;
 import org.apache.hudi.sink.common.WriteOperatorFactory;
 
@@ -31,7 +32,9 @@ import org.apache.flink.configuration.Configuration;
 public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
 
   public BucketStreamWriteOperator(Configuration conf) {
-    super(new BucketStreamWriteFunction<>(conf));
+    super(OptionsResolver.isConsistentHashingBucketIndexType(conf)
+        ? new ConsistentBucketStreamWriteFunction<>(conf)
+        : new BucketStreamWriteFunction<>(conf));
   }
 
   public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
index 5fa4c48ae19..f5dd3def770 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
@@ -25,6 +25,9 @@ import 
org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -35,6 +38,9 @@ import 
org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
 import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -45,12 +51,13 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
  * The function to tag each incoming record with a location of a file based on 
consistent bucket index.
  */
-public class ConsistentBucketAssignFunction extends 
ProcessFunction<HoodieRecord, HoodieRecord> {
+public class ConsistentBucketAssignFunction extends 
ProcessFunction<HoodieRecord, HoodieRecord> implements CheckpointedFunction {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ConsistentBucketAssignFunction.class);
 
@@ -59,6 +66,7 @@ public class ConsistentBucketAssignFunction extends 
ProcessFunction<HoodieRecord
   private final int bucketNum;
   private transient HoodieFlinkWriteClient writeClient;
   private transient Map<String, ConsistentBucketIdentifier> 
partitionToIdentifier;
+  private transient String lastRefreshInstant = HoodieTimeline.INIT_INSTANT_TS;
   private final int maxRetries = 10;
   private final long maxWaitTimeInMs = 1000;
 
@@ -124,4 +132,24 @@ public class ConsistentBucketAssignFunction extends 
ProcessFunction<HoodieRecord
       return new ConsistentBucketIdentifier(metadata);
     });
   }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
+    HoodieTimeline timeline = 
writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().findInstantsAfter(lastRefreshInstant);
+    if (!timeline.empty()) {
+      for (HoodieInstant instant : timeline.getInstants()) {
+        HoodieReplaceCommitMetadata commitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            timeline.getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+        Set<String> affectedPartitions = 
commitMetadata.getPartitionToReplaceFileIds().keySet();
+        LOG.info("Clear up cached hashing metadata because find a new replace 
commit.\n Instant: {}.\n Effected Partitions: {}.",  lastRefreshInstant, 
affectedPartitions);
+        affectedPartitions.forEach(this.partitionToIdentifier::remove);
+      }
+      this.lastRefreshInstant = timeline.lastInstant().get().getTimestamp();
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext 
functionInitializationContext) throws Exception {
+    // no operation
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
new file mode 100644
index 00000000000..9372d70b68e
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.StreamWriteFunction;
+import 
org.apache.hudi.sink.clustering.update.strategy.FlinkConsistentBucketUpdateStrategy;
+
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A stream write function with consistent bucket hash index.
+ *
+ * @param <I> the input type
+ */
+public class ConsistentBucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConsistentBucketStreamWriteFunction.class);
+
+  private transient FlinkConsistentBucketUpdateStrategy updateStrategy;
+
+  /**
+   * Constructs a ConsistentBucketStreamWriteFunction.
+   *
+   * @param config The config options
+   */
+  public ConsistentBucketStreamWriteFunction(Configuration config) {
+    super(config);
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+    super.open(parameters);
+    List<String> indexKeyFields = 
Arrays.asList(config.getString(FlinkOptions.INDEX_KEY_FIELD).split(","));
+    this.updateStrategy = new 
FlinkConsistentBucketUpdateStrategy(this.writeClient, indexKeyFields);
+  }
+
+  @Override
+  public void snapshotState() {
+    super.snapshotState();
+    updateStrategy.reset();
+  }
+
+  @Override
+  protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, 
List<HoodieRecord> records) {
+    updateStrategy.initialize(this.writeClient);
+    bucket.preWrite(records);
+    Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> 
recordListFgPair =
+        updateStrategy.handleUpdate(Collections.singletonList(Pair.of(records, 
instant)));
+    return recordListFgPair.getKey().stream().flatMap(
+        recordsInstantPair -> 
writeFunction.apply(recordsInstantPair.getLeft(), 
recordsInstantPair.getRight()).stream()
+    ).collect(Collectors.toList());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/update/strategy/FlinkConsistentBucketUpdateStrategy.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/update/strategy/FlinkConsistentBucketUpdateStrategy.java
new file mode 100644
index 00000000000..f9a58a7318d
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/update/strategy/FlinkConsistentBucketUpdateStrategy.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering.update.strategy;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
+import 
org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Update strategy for consistent hashing bucket index. If updates to file 
groups that are under clustering are identified,
+ * then the current batch of records will route to both old and new file groups
+ * (i.e., dual write).
+ */
+public class FlinkConsistentBucketUpdateStrategy<T extends 
HoodieRecordPayload> extends UpdateStrategy<T, List<Pair<List<HoodieRecord>, 
String>>> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkConsistentBucketUpdateStrategy.class);
+
+  private boolean initialized = false;
+  private List<String> indexKeyFields;
+  private Map<String, Pair<String, ConsistentBucketIdentifier>> 
partitionToIdentifier;
+  private String lastRefreshInstant = HoodieTimeline.INIT_INSTANT_TS;
+
+  public FlinkConsistentBucketUpdateStrategy(HoodieFlinkWriteClient 
writeClient, List<String> indexKeyFields) {
+    super(writeClient.getEngineContext(), writeClient.getHoodieTable(), 
Collections.emptySet());
+    this.indexKeyFields = indexKeyFields;
+    this.partitionToIdentifier = new HashMap<>();
+  }
+
+  public void initialize(HoodieFlinkWriteClient writeClient) {
+    if (initialized) {
+      return;
+    }
+    HoodieFlinkTable table = writeClient.getHoodieTable();
+    List<HoodieInstant> instants = 
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
+    if (!instants.isEmpty()) {
+      HoodieInstant latestPendingReplaceInstant = instants.get(instants.size() 
- 1);
+      if 
(latestPendingReplaceInstant.getTimestamp().compareTo(lastRefreshInstant) > 0) {
+        LOG.info("Found new pending replacement commit. Last pending 
replacement commit is {}.", latestPendingReplaceInstant);
+        this.table = table;
+        this.fileGroupsInPendingClustering = 
table.getFileSystemView().getFileGroupsInPendingClustering()
+            .map(Pair::getKey).collect(Collectors.toSet());
+        // TODO throw exception if exists bucket merge plan
+        this.lastRefreshInstant = latestPendingReplaceInstant.getTimestamp();
+        this.partitionToIdentifier.clear();
+      }
+    }
+    this.initialized = true;
+  }
+
+  public void reset() {
+    initialized = false;
+  }
+
+  @Override
+  public Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> 
handleUpdate(List<Pair<List<HoodieRecord>, String>> recordsList) {
+    ValidationUtils.checkArgument(initialized, "Strategy has not been 
initialized");
+    ValidationUtils.checkArgument(recordsList.size() == 1);
+
+    Pair<List<HoodieRecord>, String> recordsInstantPair = recordsList.get(0);
+    HoodieRecord sampleRecord = recordsInstantPair.getLeft().get(0);
+    HoodieFileGroupId fileId = new 
HoodieFileGroupId(sampleRecord.getPartitionPath(), 
sampleRecord.getCurrentLocation().getFileId());
+    if (fileGroupsInPendingClustering.isEmpty() || 
!fileGroupsInPendingClustering.contains(fileId)) {
+      return Pair.of(recordsList, Collections.singleton(fileId));
+    }
+
+    return doHandleUpdate(fileId, recordsInstantPair);
+  }
+
+  private Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> 
doHandleUpdate(HoodieFileGroupId fileId, Pair<List<HoodieRecord>, String> 
recordsInstantPair) {
+    Pair<String, ConsistentBucketIdentifier> bucketIdentifierPair = 
getBucketIdentifierOfPartition(fileId.getPartitionPath());
+    String clusteringInstant = bucketIdentifierPair.getLeft();
+    ConsistentBucketIdentifier identifier = bucketIdentifierPair.getRight();
+
+    // Construct records list routing to new file groups according the new 
bucket identifier
+    Map<String, List<HoodieRecord>> fileIdToRecords = 
recordsInstantPair.getLeft().stream().map(HoodieRecord::newInstance)
+        .collect(Collectors.groupingBy(r -> identifier.getBucket(r.getKey(), 
indexKeyFields).getFileIdPrefix()));
+
+    // Tag first record with the corresponding fileId & clusteringInstantTime
+    List<Pair<List<HoodieRecord>, String>> recordsList = new ArrayList<>();
+    Set<HoodieFileGroupId> fgs = new LinkedHashSet<>();
+    for (Map.Entry<String, List<HoodieRecord>> e : fileIdToRecords.entrySet()) 
{
+      String newFileId = FSUtils.createNewFileId(e.getKey(), 0);
+      patchFileIdToRecords(e.getValue(), newFileId);
+      recordsList.add(Pair.of(e.getValue(), clusteringInstant));
+      fgs.add(new HoodieFileGroupId(fileId.getPartitionPath(), newFileId));
+    }
+    LOG.info("Apply duplicate update for FileGroup {}, routing records to: 
{}.", fileId, String.join(",", fileIdToRecords.keySet()));
+    // TODO add option to skip dual update, i.e., write updates only to the 
new file group
+    recordsList.add(recordsInstantPair);
+    fgs.add(fileId);
+    return Pair.of(recordsList, fgs);
+  }
+
+  private Pair<String, ConsistentBucketIdentifier> 
getBucketIdentifierOfPartition(String partition) {
+    return partitionToIdentifier.computeIfAbsent(partition, p -> 
ConsistentHashingUpdateStrategyUtils.constructPartitionToIdentifier(Collections.singleton(p),
 table).get(p)
+    );
+  }
+
+  /**
+   * Rewrite the first record with given fileID
+   */
+  private void patchFileIdToRecords(List<HoodieRecord> records, String fileId) 
{
+    HoodieRecord first = records.get(0);
+    HoodieRecord record = new HoodieAvroRecord<>(first.getKey(), 
(HoodieRecordPayload) first.getData(), first.getOperation());
+    HoodieRecordLocation newLoc = new HoodieRecordLocation("U", fileId);
+    record.setCurrentLocation(newLoc);
+    records.set(0, record);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 900ccc77670..5d945d07aa1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -366,7 +366,7 @@ public class Pipelines {
               .transform(
                   opName("consistent_bucket_write", conf),
                   TypeInformation.of(Object.class),
-                  StreamWriteOperator.getFactory(conf))
+                  BucketStreamWriteOperator.getFactory(conf))
               .uid(opUID("consistent_bucket_write", conf))
               .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
         default:
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 2d47bb8a1b4..75d4ea79815 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -53,9 +53,10 @@ public class ClusteringUtil {
       HoodieIndex.BucketIndexEngineType bucketIndexEngineType = 
OptionsResolver.getBucketEngineType(conf);
       switch (bucketIndexEngineType) {
         case SIMPLE:
-          throw new UnsupportedOperationException("Clustering is not supported 
for simple bucket index.");
+          throw new HoodieNotSupportedException("Clustering is not supported 
for simple bucket index.");
         case CONSISTENT_HASHING:
-          if 
(!conf.get(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS).equalsIgnoreCase(FlinkConsistentBucketClusteringPlanStrategy.class.getName()))
 {
+          String clusteringPlanStrategyClass = 
conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, 
OptionsResolver.getDefaultPlanStrategyClassName(conf));
+          if 
(!clusteringPlanStrategyClass.equalsIgnoreCase(FlinkConsistentBucketClusteringPlanStrategy.class.getName()))
 {
             throw new HoodieNotSupportedException(
                 "CLUSTERING_PLAN_STRATEGY_CLASS should be set to " + 
FlinkConsistentBucketClusteringPlanStrategy.class.getName() + " in order to 
work with Consistent Hashing Bucket Index.");
           }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 1835e9c894b..960b85d95ab 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -40,7 +40,6 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
-import 
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -164,7 +163,7 @@ public class FlinkWriteClients {
             .withClusteringConfig(
                 HoodieClusteringConfig.newBuilder()
                     
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
-                    
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
 getDefaultPlanStrategyClassName(conf)))
+                    
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
 OptionsResolver.getDefaultPlanStrategyClassName(conf)))
                     .withClusteringPlanPartitionFilterMode(
                         
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
                     
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
@@ -246,9 +245,4 @@ public class FlinkWriteClients {
     }
     return writeConfig;
   }
-
-  private static String getDefaultPlanStrategyClassName(Configuration conf) {
-    return OptionsResolver.isConsistentHashingBucketIndexType(conf) ? 
FlinkConsistentBucketClusteringPlanStrategy.class.getName() :
-        FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.defaultValue();
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 4683955d8e0..d968362ca2a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -20,6 +20,8 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.utils.TestData;
 
@@ -156,6 +158,44 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
     // insert async clustering is only valid for cow table.
   }
 
+  @Test
+  public void testConsistentBucketIndex() throws Exception {
+    conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+    conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, 
"CONSISTENT_HASHING");
+    conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+    conf.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
+    // Enable inline resize scheduling
+    conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+    // Manually set the max commits to trigger clustering quickly
+    conf.setString(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), 
"1");
+    // Manually set the split threshold to trigger split in the clustering
+    conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    conf.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(), 
String.valueOf(1 / 1024.0 / 1024.0));
+    conf.set(FlinkOptions.PRE_COMBINE, true);
+    HashMap<String, String> mergedExpected = new HashMap<>(EXPECTED1);
+    mergedExpected.put("par1", "[id1,par1,id1,Danny,22,4,par1, 
id2,par1,id2,Stephen,33,2,par1]");
+    TestHarness.instance().preparePipeline(tempFile, conf)
+        .consume(TestData.DATA_SET_INSERT)
+        .emptyEventBuffer()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED1, 4)
+        .consume(TestData.DATA_SET_DISORDER_INSERT)
+        .emptyEventBuffer()
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .checkWrittenData(mergedExpected, 4)
+        .consume(TestData.DATA_SET_SINGLE_INSERT)
+        .emptyEventBuffer()
+        .checkpoint(3)
+        .assertNextEvent()
+        .checkpointComplete(3)
+        .checkWrittenData(mergedExpected, 4)
+        .end();
+  }
+
   @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
index 2eb6e4b3626..4882552f0a7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
@@ -22,6 +22,8 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.exception.HoodieException;
@@ -86,18 +88,34 @@ public class ITTestConsistentBucketStreamWrite extends 
TestLogger {
     conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
     conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, 
"CONSISTENT_HASHING");
     conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
-    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.MERGE_ON_READ.name());
     testWriteToHoodie(conf, "mor_write", 1, EXPECTED);
   }
 
+  @Test
+  public void testWriteMORWithResizePlan() throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+    conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, 
"CONSISTENT_HASHING");
+    conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+    conf.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
+    conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.MERGE_ON_READ.name());
+    // Enable inline resize scheduling
+    conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+    // Manually set the max commits to trigger clustering quickly
+    conf.setString(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), 
"1");
+    // Manually set the split threshold to trigger split in the clustering
+    conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    conf.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(), 
String.valueOf(1 / 1024.0 / 1024.0));
+    testWriteToHoodie(conf, "mor_write", 1, EXPECTED);
+  }
+
   @Test
   public void testBulkInsert() {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
     conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
     conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, 
"CONSISTENT_HASHING");
     conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
-    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.MERGE_ON_READ.name());
     conf.setString(FlinkOptions.OPERATION, "bulk_insert");
 
@@ -113,7 +131,6 @@ public class ITTestConsistentBucketStreamWrite extends 
TestLogger {
     conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
     conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE, 
"CONSISTENT_HASHING");
     conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
-    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.MERGE_ON_READ.name());
     conf.setString(FlinkOptions.OPERATION, "INSERT_OVERWRITE");
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index aa318efa8da..df648df9ac0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -59,24 +59,24 @@ import java.util.concurrent.CompletableFuture;
  * @param <I> Input type
  */
 public class BucketStreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
-  private final Configuration conf;
+  protected final Configuration conf;
 
   private final IOManager ioManager;
-  private final StreamingRuntimeContext runtimeContext;
+  protected final StreamingRuntimeContext runtimeContext;
   private final MockOperatorEventGateway gateway;
   private final MockOperatorCoordinatorContext coordinatorContext;
-  private final StreamWriteOperatorCoordinator coordinator;
-  private final MockStateInitializationContext stateInitializationContext;
+  protected final StreamWriteOperatorCoordinator coordinator;
+  protected final MockStateInitializationContext stateInitializationContext;
 
   /**
    * Function that converts row data to HoodieRecord.
    */
-  private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
+  protected RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
 
   /**
    * Stream write function.
    */
-  private StreamWriteFunction<HoodieRecord<?>> writeFunction;
+  protected StreamWriteFunction<HoodieRecord<?>> writeFunction;
 
   private CompactFunctionWrapper compactFunctionWrapper;
 
@@ -195,7 +195,7 @@ public class BucketStreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<
   // -------------------------------------------------------------------------
 
   private void setupWriteFunction() throws Exception {
-    writeFunction = new BucketStreamWriteFunction<>(conf);
+    writeFunction = createWriteFunction();
     writeFunction.setRuntimeContext(runtimeContext);
     writeFunction.setOperatorEventGateway(gateway);
     writeFunction.initializeState(this.stateInitializationContext);
@@ -204,4 +204,8 @@ public class BucketStreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<
     // handle the bootstrap event
     coordinator.handleEventFromOperator(0, getNextEvent());
   }
+
+  protected StreamWriteFunction<HoodieRecord<?>> createWriteFunction() {
+    return new BucketStreamWriteFunction<>(conf);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ConsistentBucketStreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ConsistentBucketStreamWriteFunctionWrapper.java
new file mode 100644
index 00000000000..02591bbb412
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ConsistentBucketStreamWriteFunctionWrapper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.sink.StreamWriteFunction;
+import org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction;
+import org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import 
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
+import org.apache.flink.table.data.RowData;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class to manipulate the {@link 
ConsistentBucketStreamWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class ConsistentBucketStreamWriteFunctionWrapper<I> extends 
BucketStreamWriteFunctionWrapper<I> {
+
+  private ConsistentBucketAssignFunction assignFunction;
+
+  public ConsistentBucketStreamWriteFunctionWrapper(String tablePath) throws 
Exception {
+    this(tablePath, TestConfigurations.getDefaultConf(tablePath));
+  }
+
+  public ConsistentBucketStreamWriteFunctionWrapper(String tablePath, 
Configuration conf) throws Exception {
+    super(tablePath, conf);
+  }
+
+  @Override
+  public void openFunction() throws Exception {
+    super.openFunction();
+    assignFunction = new ConsistentBucketAssignFunction(conf);
+    assignFunction.setRuntimeContext(runtimeContext);
+    assignFunction.open(conf);
+  }
+
+  @Override
+  public void invoke(I record) throws Exception {
+    HoodieRecord hoodieRecord = toHoodieFunction.map((RowData) record);
+    ScalaCollector<HoodieRecord> collector = ScalaCollector.getInstance();
+    assignFunction.processElement(hoodieRecord, null, collector);
+    writeFunction.processElement(collector.getVal(), null, null);
+  }
+
+  @Override
+  protected StreamWriteFunction<HoodieRecord<?>> createWriteFunction() {
+    return new ConsistentBucketStreamWriteFunction<>(conf);
+  }
+
+  @Override
+  public void checkpointFunction(long checkpointId) throws Exception {
+    // checkpoint the coordinator first
+    FunctionSnapshotContext functionSnapshotContext = new 
MockFunctionSnapshotContext(checkpointId);
+    this.coordinator.checkpointCoordinator(checkpointId, new 
CompletableFuture<>());
+    writeFunction.snapshotState(functionSnapshotContext);
+    assignFunction.snapshotState(functionSnapshotContext);
+    
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ScalaCollector.java
similarity index 56%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
copy to 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ScalaCollector.java
index a48ea44ddc4..681c6de2a89 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ScalaCollector.java
@@ -16,25 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.sink.bucket;
+package org.apache.hudi.sink.utils;
 
-import org.apache.hudi.sink.common.AbstractWriteOperator;
-import org.apache.hudi.sink.common.WriteOperatorFactory;
-
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
 
 /**
- * Operator for {@link BucketStreamWriteFunction}.
- *
- * @param <I> The input type
+ * A mock {@link Collector} that used in  {@link TestFunctionWrapper}.
  */
-public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
+class ScalaCollector<T> implements Collector<T> {
+  private T val;
+
+  public static <T> ScalaCollector<T> getInstance() {
+    return new ScalaCollector<>();
+  }
+
+  @Override
+  public void collect(T t) {
+    this.val = t;
+  }
 
-  public BucketStreamWriteOperator(Configuration conf) {
-    super(new BucketStreamWriteFunction<>(conf));
+  @Override
+  public void close() {
+    this.val = null;
   }
 
-  public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
-    return WriteOperatorFactory.instance(conf, new 
BucketStreamWriteOperator<>(conf));
+  public T getVal() {
+    return val;
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 22280bb3129..36b0dc32757 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -290,26 +290,4 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
       return this.updateKeys.contains(key);
     }
   }
-
-  private static class ScalaCollector<T> implements Collector<T> {
-    private T val;
-
-    public static <T> ScalaCollector<T> getInstance() {
-      return new ScalaCollector<>();
-    }
-
-    @Override
-    public void collect(T t) {
-      this.val = t;
-    }
-
-    @Override
-    public void close() {
-      this.val = null;
-    }
-
-    public T getVal() {
-      return val;
-    }
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 8c8ecf71061..db9cd65b9f1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -34,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper;
 import org.apache.hudi.sink.utils.InsertFunctionWrapper;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.sink.utils.TestFunctionWrapper;
@@ -539,7 +540,11 @@ public class TestData {
     if (OptionsResolver.isAppendMode(conf)) {
       return new InsertFunctionWrapper<>(basePath, conf);
     } else if (OptionsResolver.isBucketIndexType(conf)) {
-      return new BucketStreamWriteFunctionWrapper<>(basePath, conf);
+      if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
+        return new ConsistentBucketStreamWriteFunctionWrapper<>(basePath, 
conf);
+      } else {
+        return new BucketStreamWriteFunctionWrapper<>(basePath, conf);
+      }
     } else {
       return new StreamWriteFunctionWrapper<>(basePath, conf);
     }

Reply via email to