This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dbc0b433425 [HUDI-6328] Flink support generate resize plan for 
consistent bucket index (#9030)
dbc0b433425 is described below

commit dbc0b433425fd57622b7028257ec6083b57394ce
Author: Jing Zhang <[email protected]>
AuthorDate: Mon Jun 26 17:23:53 2023 +0800

    [HUDI-6328] Flink support generate resize plan for consistent bucket index 
(#9030)
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  51 ++--
 ...istentHashingBucketClusteringPlanStrategy.java} | 125 ++++++----
 .../apache/hudi/config/TestHoodieWriteConfig.java  |   2 +-
 ...linkConsistentBucketClusteringPlanStrategy.java |  56 +++++
 ...parkConsistentBucketClusteringPlanStrategy.java | 276 +--------------------
 ...onsistentBucketClusteringExecutionStrategy.java |   6 +-
 ...arkConsistentBucketDuplicateUpdateStrategy.java |   7 +-
 .../bucket/HoodieSparkConsistentBucketIndex.java   |   8 +-
 ...parkConsistentBucketClusteringPlanStrategy.java |   6 +-
 .../apache/hudi/configuration/OptionsResolver.java |   4 +
 .../java/org/apache/hudi/sink/utils/Pipelines.java |   2 +-
 .../java/org/apache/hudi/util/ClusteringUtil.java  |  18 +-
 .../org/apache/hudi/util/FlinkWriteClients.java    |  10 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   1 +
 .../ITTestFlinkConsistentHashingClustering.java    | 175 +++++++++++++
 15 files changed, 378 insertions(+), 369 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 9732c52ac42..cafed2febc6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -56,6 +56,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
       
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
   public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
       
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
+  public static final String FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY =
+      
"org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy";
   public static final String SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY =
       
"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy";
   public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
@@ -619,19 +621,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
     }
 
     private void setDefaults() {
-      // Consistent hashing bucket index
-      if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
-          && 
clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
-          && 
clusteringConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())
-          && 
clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name()))
 {
-        clusteringConfig.setDefaultValue(PLAN_STRATEGY_CLASS_NAME, 
SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
-        clusteringConfig.setDefaultValue(EXECUTION_STRATEGY_CLASS_NAME, 
SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
-      } else {
-        clusteringConfig.setDefaultValue(
-            PLAN_STRATEGY_CLASS_NAME, 
getDefaultPlanStrategyClassName(engineType));
-        clusteringConfig.setDefaultValue(
-            EXECUTION_STRATEGY_CLASS_NAME, 
getDefaultExecutionStrategyClassName(engineType));
-      }
+      clusteringConfig.setDefaultValue(PLAN_STRATEGY_CLASS_NAME, 
getDefaultPlanStrategyClassName(engineType));
+      clusteringConfig.setDefaultValue(EXECUTION_STRATEGY_CLASS_NAME, 
getDefaultExecutionStrategyClassName(engineType));
       clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
     }
 
@@ -642,27 +633,35 @@ public class HoodieClusteringConfig extends HoodieConfig {
               + "schedule inline clustering (%s) can be enabled. Both can't be 
set to true at the same time. %s,%s", 
HoodieClusteringConfig.INLINE_CLUSTERING.key(),
           HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), 
inlineCluster, inlineClusterSchedule));
 
-      if (engineType.equals(EngineType.FLINK)) {
-        // support resize for Flink to unlock the validation.
-        return;
+      if (isConsistentHashingBucketIndex()) {
+        String planStrategy = 
clusteringConfig.getString(PLAN_STRATEGY_CLASS_NAME);
+        if (engineType == EngineType.FLINK) {
+          
ValidationUtils.checkArgument(planStrategy.equalsIgnoreCase(FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
+              "Consistent hashing bucket index only supports clustering plan 
strategy : " + FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
+        } else {
+          ValidationUtils.checkArgument(
+              
planStrategy.equalsIgnoreCase(SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
+              "Consistent hashing bucket index only supports clustering plan 
strategy : " + SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
+          ValidationUtils.checkArgument(
+              
clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY),
+              "Consistent hashing bucket index only supports clustering 
execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
+        }
       }
-      // Consistent hashing bucket index
-      if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
+    }
+
+    private boolean isConsistentHashingBucketIndex() {
+      return clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
           && 
clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
           && 
clusteringConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())
-          && 
clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name()))
 {
-        
ValidationUtils.checkArgument(clusteringConfig.getString(PLAN_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
-            "Consistent hashing bucket index only supports clustering plan 
strategy : " + SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
-        
ValidationUtils.checkArgument(clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY),
-            "Consistent hashing bucket index only supports clustering 
execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
-      }
+          && 
clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
     }
 
     private String getDefaultPlanStrategyClassName(EngineType engineType) {
       switch (engineType) {
         case SPARK:
-          return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+          return isConsistentHashingBucketIndex() ? 
SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY : 
SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
         case FLINK:
+          return isConsistentHashingBucketIndex() ? 
FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY : 
FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
         case JAVA:
           return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
         default:
@@ -673,7 +672,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
     private String getDefaultExecutionStrategyClassName(EngineType engineType) 
{
       switch (engineType) {
         case SPARK:
-          return SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
+          return isConsistentHashingBucketIndex() ? 
SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY : 
SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
         case FLINK:
         case JAVA:
           return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
similarity index 82%
copy from 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
copy to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
index a49ab1ddf4b..59f9fcb81d1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
@@ -16,18 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.clustering.plan.strategy;
+package org.apache.hudi.table.action.cluster.strategy;
 
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.TableFileSystemView;
@@ -40,11 +37,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
 import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
-import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
+import org.apache.hudi.index.bucket.HoodieConsistentBucketIndex;
 import org.apache.hudi.table.HoodieTable;
-import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
 
-import org.apache.spark.api.java.JavaRDD;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,20 +55,23 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 /**
- * Clustering plan strategy specifically for consistent bucket index
+ * Clustering plan strategy specifically for consistent bucket index.
  */
-public class SparkConsistentBucketClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
-    extends PartitionAwareClusteringPlanStrategy<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+public abstract class BaseConsistentHashingBucketClusteringPlanStrategy<T 
extends HoodieRecordPayload, I, K, O>
+    extends PartitionAwareClusteringPlanStrategy<T, I, K, O> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(SparkConsistentBucketClusteringPlanStrategy.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseConsistentHashingBucketClusteringPlanStrategy.class);
 
   public static final String METADATA_PARTITION_KEY = 
"clustering.group.partition";
   public static final String METADATA_CHILD_NODE_KEY = 
"clustering.group.child.node";
   public static final String METADATA_SEQUENCE_NUMBER_KEY = 
"clustering.group.sequence.no";
 
-  public SparkConsistentBucketClusteringPlanStrategy(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+  public BaseConsistentHashingBucketClusteringPlanStrategy(
+      HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig 
writeConfig) {
     super(table, engineContext, writeConfig);
-    validate();
+    ValidationUtils.checkArgument(
+        getHoodieTable().getIndex() instanceof HoodieConsistentBucketIndex,
+        this.getClass().getName() + " is only applicable to table with 
consistent hash index.");
   }
 
   /**
@@ -93,6 +91,64 @@ public class SparkConsistentBucketClusteringPlanStrategy<T 
extends HoodieRecordP
     return true;
   }
 
+  /**
+   * Generate cluster group based on split, merge and sort rules
+   */
+  @Override
+  protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
+    Option<HoodieConsistentHashingMetadata> metadata = 
ConsistentBucketIndexUtils.loadMetadata(getHoodieTable(), partitionPath);
+    ValidationUtils.checkArgument(metadata.isPresent(), "Metadata is empty for 
partition: " + partitionPath);
+    ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata.get());
+
+    // Apply split rule
+    int splitSlot = getWriteConfig().getBucketIndexMaxNumBuckets() - 
identifier.getNumBuckets();
+    Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> splitResult =
+        buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
+    List<HoodieClusteringGroup> ret = new ArrayList<>(splitResult.getLeft());
+
+    List<FileSlice> remainedSlices = splitResult.getRight();
+    if (isBucketClusteringMergeEnabled()) {
+      // Apply merge rule
+      int mergeSlot = identifier.getNumBuckets() - 
getWriteConfig().getBucketIndexMinNumBuckets() + splitResult.getMiddle();
+      Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> 
mergeResult =
+          buildMergeClusteringGroup(identifier, remainedSlices, mergeSlot);
+      ret.addAll(mergeResult.getLeft());
+      remainedSlices = mergeResult.getRight();
+    }
+    if (isBucketClusteringSortEnabled()) {
+      // Apply sort only to the remaining file groups
+      ret.addAll(remainedSlices.stream().map(fs -> {
+        ConsistentHashingNode oldNode = 
identifier.getBucketByFileId(fs.getFileId());
+        ConsistentHashingNode newNode = new 
ConsistentHashingNode(oldNode.getValue(), FSUtils.createNewFileIdPfx(), 
ConsistentHashingNode.NodeTag.REPLACE);
+        return HoodieClusteringGroup.newBuilder()
+            .setSlices(getFileSliceInfo(Collections.singletonList(fs)))
+            .setNumOutputFileGroups(1)
+            .setMetrics(buildMetrics(Collections.singletonList(fs)))
+            .setExtraMetadata(constructExtraMetadata(fs.getPartitionPath(), 
Collections.singletonList(newNode), identifier.getMetadata().getSeqNo()))
+            .build();
+      }).collect(Collectors.toList()));
+    }
+    return ret.stream();
+  }
+
+  /**
+   * Whether enable buckets merged when using consistent hashing bucket index.
+   *
+   * @return true if bucket merge is enabled, false otherwise.
+   */
+  protected boolean isBucketClusteringMergeEnabled() {
+    return true;
+  }
+
+  /**
+   * Whether generate regular sort clustering plans for buckets that are not 
involved in merge or split.
+   *
+   * @return true if generate regular sort clustering plans for buckets that 
are not involved in merge or split, false otherwise.
+   */
+  protected boolean isBucketClusteringSortEnabled() {
+    return true;
+  }
+
   /**
    * Generate candidate clustering file slices of the given partition.
    * If there is inflight / requested clustering working on the partition, 
then return empty list
@@ -121,42 +177,6 @@ public class SparkConsistentBucketClusteringPlanStrategy<T 
extends HoodieRecordP
     return params;
   }
 
-  /**
-   * Generate cluster group based on split, merge and sort rules
-   */
-  @Override
-  protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
-    Option<HoodieConsistentHashingMetadata> metadata = 
ConsistentBucketIndexUtils.loadMetadata(getHoodieTable(), partitionPath);
-    ValidationUtils.checkArgument(metadata.isPresent(), "Metadata is empty for 
partition: " + partitionPath);
-    ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata.get());
-
-    // Apply split rule
-    int splitSlot = getWriteConfig().getBucketIndexMaxNumBuckets() - 
identifier.getNumBuckets();
-    Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> splitResult =
-        buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
-    List<HoodieClusteringGroup> ret = new ArrayList<>(splitResult.getLeft());
-
-    // Apply merge rule
-    int mergeSlot = identifier.getNumBuckets() - 
getWriteConfig().getBucketIndexMinNumBuckets() + splitResult.getMiddle();
-    Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> mergeResult =
-        buildMergeClusteringGroup(identifier, splitResult.getRight(), 
mergeSlot);
-    ret.addAll(mergeResult.getLeft());
-
-    // Apply sort only to the remaining file groups
-    ret.addAll(mergeResult.getRight().stream().map(fs -> {
-      ConsistentHashingNode oldNode = 
identifier.getBucketByFileId(fs.getFileId());
-      ConsistentHashingNode newNode = new 
ConsistentHashingNode(oldNode.getValue(), FSUtils.createNewFileIdPfx(), 
ConsistentHashingNode.NodeTag.REPLACE);
-      return HoodieClusteringGroup.newBuilder()
-          .setSlices(getFileSliceInfo(Collections.singletonList(fs)))
-          .setNumOutputFileGroups(1)
-          .setMetrics(buildMetrics(Collections.singletonList(fs)))
-          .setExtraMetadata(constructExtraMetadata(fs.getPartitionPath(), 
Collections.singletonList(newNode), identifier.getMetadata().getSeqNo()))
-          .build();
-    }).collect(Collectors.toList()));
-
-    return ret.stream();
-  }
-
   /**
    * Generate clustering groups according to split rules.
    * Currently, we always split bucket into two sub-buckets.
@@ -298,7 +318,7 @@ public class SparkConsistentBucketClusteringPlanStrategy<T 
extends HoodieRecordP
       extraMetadata.put(METADATA_CHILD_NODE_KEY, 
ConsistentHashingNode.toJsonString(nodes));
       extraMetadata.put(METADATA_SEQUENCE_NUMBER_KEY, Integer.toString(seqNo));
     } catch (IOException e) {
-      LOG.error("Failed to construct extra metadata, partition: " + partition 
+ ", nodes:" + nodes);
+      LOG.error("Failed to construct extra metadata, partition: {}, nodes:{}", 
partition, nodes);
       throw new HoodieClusteringException("Failed to construct extra metadata, 
partition: " + partition + ", nodes:" + nodes);
     }
     return extraMetadata;
@@ -313,9 +333,4 @@ public class SparkConsistentBucketClusteringPlanStrategy<T 
extends HoodieRecordP
     HoodieFileFormat format = 
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
     return (long) (getWriteConfig().getMaxFileSize(format) * 
getWriteConfig().getBucketMergeThreshold());
   }
-
-  private void validate() {
-    ValidationUtils.checkArgument(getHoodieTable().getIndex() instanceof 
HoodieSparkConsistentBucketIndex,
-        "SparConsistentBucketClusteringPlanStrategy is only applicable to 
table with consistent hash index");
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 5f4e52be2fe..f9d5d69aec0 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -105,7 +105,7 @@ public class TestHoodieWriteConfig {
     testEngineSpecificConfig(HoodieWriteConfig::getClusteringPlanStrategyClass,
         constructConfigMap(
             EngineType.SPARK, 
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
-            EngineType.FLINK, 
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
+            EngineType.FLINK, 
HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
             EngineType.JAVA, 
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY));
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkConsistentBucketClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkConsistentBucketClusteringPlanStrategy.java
new file mode 100644
index 00000000000..8a3ecf1916b
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkConsistentBucketClusteringPlanStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
+
+import java.util.List;
+
+/**
+ * Consistent hashing bucket index clustering plan for Flink engine.
+ */
+public class FlinkConsistentBucketClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+    extends BaseConsistentHashingBucketClusteringPlanStrategy<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+
+  public FlinkConsistentBucketClusteringPlanStrategy(
+      HoodieTable table, HoodieEngineContext engineContext,
+      HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected boolean isBucketClusteringMergeEnabled() {
+    // Flink would not generate merge plan because it would cause multiple 
write sub tasks write to the same file group.
+    return false;
+  }
+
+  @Override
+  protected boolean isBucketClusteringSortEnabled() {
+    // Flink would not generate sort clustering plans for buckets that are not 
involved in merge or split to avoid unnecessary clustering costs.
+    return false;
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
index a49ab1ddf4b..bb3cc9f7aa7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
@@ -21,301 +21,39 @@ package org.apache.hudi.client.clustering.plan.strategy;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.TableFileSystemView;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Triple;
-import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
-import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
-import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
 import org.apache.hudi.table.HoodieTable;
-import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
 
 import org.apache.spark.api.java.JavaRDD;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
 /**
- * Clustering plan strategy specifically for consistent bucket index
+ * Consistent hashing bucket index clustering plan for spark engine.
  */
 public class SparkConsistentBucketClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
-    extends PartitionAwareClusteringPlanStrategy<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(SparkConsistentBucketClusteringPlanStrategy.class);
-
-  public static final String METADATA_PARTITION_KEY = 
"clustering.group.partition";
-  public static final String METADATA_CHILD_NODE_KEY = 
"clustering.group.child.node";
-  public static final String METADATA_SEQUENCE_NUMBER_KEY = 
"clustering.group.sequence.no";
+    extends BaseConsistentHashingBucketClusteringPlanStrategy<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
 
   public SparkConsistentBucketClusteringPlanStrategy(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
     super(table, engineContext, writeConfig);
-    validate();
-  }
-
-  /**
-   * TODO maybe add force config to schedule the clustering. It could allow 
clustering on partitions that are not doing write operation.
-   * Block clustering if there is any ongoing concurrent writers
-   *
-   * @return true if the schedule can proceed
-   */
-  @Override
-  public boolean checkPrecondition() {
-    HoodieTimeline timeline = 
getHoodieTable().getActiveTimeline().getDeltaCommitTimeline().filterInflightsAndRequested();
-    if (!timeline.empty()) {
-      LOG.warn("When using consistent bucket, clustering cannot be scheduled 
async if there are concurrent writers. "
-          + "Writer instant: " + timeline.getInstants());
-      return false;
-    }
-    return true;
   }
 
-  /**
-   * Generate candidate clustering file slices of the given partition.
-   * If there is inflight / requested clustering working on the partition, 
then return empty list
-   * to ensure serialized update to the hashing metadata.
-   *
-   * @return candidate file slices to be clustered (i.e., sort, bucket split 
or merge)
-   */
   @Override
-  protected Stream<FileSlice> getFileSlicesEligibleForClustering(String 
partition) {
-    TableFileSystemView fileSystemView = getHoodieTable().getFileSystemView();
-    boolean isPartitionInClustering = 
fileSystemView.getFileGroupsInPendingClustering().anyMatch(p -> 
p.getLeft().getPartitionPath().equals(partition));
-    if (isPartitionInClustering) {
-      LOG.info("Partition: " + partition + " is already in clustering, skip");
-      return Stream.empty();
-    }
-
-    return super.getFileSlicesEligibleForClustering(partition);
-  }
-
-  @Override
-  protected Map<String, String> getStrategyParams() {
-    Map<String, String> params = new HashMap<>();
-    if 
(!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
-      params.put(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), 
getWriteConfig().getClusteringSortColumns());
-    }
-    return params;
+  protected Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> 
buildMergeClusteringGroup(
+      ConsistentBucketIdentifier identifier, List<FileSlice> fileSlices, int 
mergeSlot) {
+    return super.buildMergeClusteringGroup(identifier, fileSlices, mergeSlot);
   }
 
-  /**
-   * Generate cluster group based on split, merge and sort rules
-   */
   @Override
-  protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
-    Option<HoodieConsistentHashingMetadata> metadata = 
ConsistentBucketIndexUtils.loadMetadata(getHoodieTable(), partitionPath);
-    ValidationUtils.checkArgument(metadata.isPresent(), "Metadata is empty for 
partition: " + partitionPath);
-    ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata.get());
-
-    // Apply split rule
-    int splitSlot = getWriteConfig().getBucketIndexMaxNumBuckets() - 
identifier.getNumBuckets();
-    Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> splitResult =
-        buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
-    List<HoodieClusteringGroup> ret = new ArrayList<>(splitResult.getLeft());
-
-    // Apply merge rule
-    int mergeSlot = identifier.getNumBuckets() - 
getWriteConfig().getBucketIndexMinNumBuckets() + splitResult.getMiddle();
-    Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> mergeResult =
-        buildMergeClusteringGroup(identifier, splitResult.getRight(), 
mergeSlot);
-    ret.addAll(mergeResult.getLeft());
-
-    // Apply sort only to the remaining file groups
-    ret.addAll(mergeResult.getRight().stream().map(fs -> {
-      ConsistentHashingNode oldNode = 
identifier.getBucketByFileId(fs.getFileId());
-      ConsistentHashingNode newNode = new 
ConsistentHashingNode(oldNode.getValue(), FSUtils.createNewFileIdPfx(), 
ConsistentHashingNode.NodeTag.REPLACE);
-      return HoodieClusteringGroup.newBuilder()
-          .setSlices(getFileSliceInfo(Collections.singletonList(fs)))
-          .setNumOutputFileGroups(1)
-          .setMetrics(buildMetrics(Collections.singletonList(fs)))
-          .setExtraMetadata(constructExtraMetadata(fs.getPartitionPath(), 
Collections.singletonList(newNode), identifier.getMetadata().getSeqNo()))
-          .build();
-    }).collect(Collectors.toList()));
-
-    return ret.stream();
-  }
-
-  /**
-   * Generate clustering groups according to split rules.
-   * Currently, we always split bucket into two sub-buckets.
-   *
-   * @param identifier bucket identifier
-   * @param fileSlices file slice candidate to be built as split clustering 
groups
-   * @param splitSlot  number of new bucket allowed to produce, in order to 
constrain the upper bound of the total number of bucket
-   * @return list of clustering group, number of new buckets generated, 
remaining file slice (that does not split)
-   */
   protected Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> 
buildSplitClusteringGroups(
       ConsistentBucketIdentifier identifier, List<FileSlice> fileSlices, int 
splitSlot) {
-    List<HoodieClusteringGroup> retGroup = new ArrayList<>();
-    List<FileSlice> fsUntouched = new ArrayList<>();
-    long splitSize = getSplitSize();
-    int remainingSplitSlot = splitSlot;
-    for (FileSlice fs : fileSlices) {
-      boolean needSplit = fs.getTotalFileSize() > splitSize;
-      if (!needSplit || remainingSplitSlot == 0) {
-        fsUntouched.add(fs);
-        continue;
-      }
-
-      Option<List<ConsistentHashingNode>> nodes = 
identifier.splitBucket(fs.getFileId());
-
-      // Bucket cannot be split
-      if (!nodes.isPresent()) {
-        fsUntouched.add(fs);
-        continue;
-      }
-
-      remainingSplitSlot--;
-      List<FileSlice> fsList = Collections.singletonList(fs);
-      retGroup.add(HoodieClusteringGroup.newBuilder()
-          .setSlices(getFileSliceInfo(fsList))
-          .setNumOutputFileGroups(2)
-          .setMetrics(buildMetrics(fsList))
-          .setExtraMetadata(constructExtraMetadata(fs.getPartitionPath(), 
nodes.get(), identifier.getMetadata().getSeqNo()))
-          .build());
-    }
-    return Triple.of(retGroup, splitSlot - remainingSplitSlot, fsUntouched);
-  }
-
-  /**
-   * Generate clustering group according to merge rules
-   *
-   * @param identifier bucket identifier
-   * @param fileSlices file slice candidates to be built as merge clustering 
groups
-   * @param mergeSlot  number of bucket allowed to be merged, in order to 
guarantee the lower bound of the total number of bucket
-   * @return list of clustering group, number of buckets merged (removed), 
remaining file slice (that does not be merged)
-   */
-  protected Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> 
buildMergeClusteringGroup(
-      ConsistentBucketIdentifier identifier, List<FileSlice> fileSlices, int 
mergeSlot) {
-    if (fileSlices.size() <= 1) {
-      return Triple.of(Collections.emptyList(), 0, fileSlices);
-    }
-
-    long mergeSize = getMergeSize();
-    int remainingMergeSlot = mergeSlot;
-    List<HoodieClusteringGroup> groups = new ArrayList<>();
-    boolean[] added = new boolean[fileSlices.size()];
-
-    fileSlices.sort(Comparator.comparingInt(a -> 
identifier.getBucketByFileId(a.getFileId()).getValue()));
-    // In each round, we check if the ith file slice can be merged with its 
predecessors and successors
-    for (int i = 0; i < fileSlices.size(); ++i) {
-      if (added[i] || fileSlices.get(i).getTotalFileSize() > mergeSize) {
-        continue;
-      }
-
-      // 0: startIdx, 1: endIdx
-      int[] rangeIdx = {i, i};
-      long totalSize = fileSlices.get(i).getTotalFileSize();
-      // Do backward check first (k == 0), and then forward check (k == 1)
-      for (int k = 0; k < 2; ++k) {
-        boolean forward = k == 1;
-        do {
-          int nextIdx = forward ? (rangeIdx[k] + 1 < fileSlices.size() ? 
rangeIdx[k] + 1 : 0) : (rangeIdx[k] >= 1 ? rangeIdx[k] - 1 : fileSlices.size() 
- 1);
-          boolean isNeighbour = 
identifier.getBucketByFileId(fileSlices.get(nextIdx).getFileId()) == 
identifier.getFormerBucket(fileSlices.get(rangeIdx[k]).getFileId());
-          /**
-           * Merge condition:
-           * 1. there is still slot to merge bucket
-           * 2. the previous file slices is not merged
-           * 3. the previous file slice and current file slice are neighbour 
in the hash ring
-           * 4. Both the total file size up to now and the previous file slice 
size are smaller than merge size threshold
-           */
-          if (remainingMergeSlot == 0 || added[nextIdx] || !isNeighbour || 
totalSize > mergeSize || fileSlices.get(nextIdx).getTotalFileSize() > 
mergeSize) {
-            break;
-          }
-
-          // Mark preIdx as merge candidate
-          totalSize += fileSlices.get(nextIdx).getTotalFileSize();
-          rangeIdx[k] = nextIdx;
-          remainingMergeSlot--;
-        } while (rangeIdx[k] != i);
-      }
-
-      int startIdx = rangeIdx[0];
-      int endIdx = rangeIdx[1];
-      if (endIdx == i && startIdx == i) {
-        continue;
-      }
-
-      // Construct merge group if there is at least two file slices
-      List<FileSlice> fs = new ArrayList<>();
-      while (true) {
-        added[startIdx] = true;
-        fs.add(fileSlices.get(startIdx));
-        if (startIdx == endIdx) {
-          break;
-        }
-        startIdx = startIdx + 1 < fileSlices.size() ? startIdx + 1 : 0;
-      }
-
-      groups.add(HoodieClusteringGroup.newBuilder()
-          .setSlices(getFileSliceInfo(fs))
-          .setNumOutputFileGroups(1)
-          .setMetrics(buildMetrics(fs))
-          .setExtraMetadata(
-              constructExtraMetadata(
-                  fs.get(0).getPartitionPath(),
-                  
identifier.mergeBucket(fs.stream().map(FileSlice::getFileId).collect(Collectors.toList())),
-                  identifier.getMetadata().getSeqNo()))
-          .build());
-    }
-
-    // Collect file slices that are not involved in merge
-    List<FileSlice> fsUntouched = IntStream.range(0, 
fileSlices.size()).filter(i -> !added[i])
-        .mapToObj(fileSlices::get).collect(Collectors.toList());
-
-    return Triple.of(groups, mergeSlot - remainingMergeSlot, fsUntouched);
-  }
-
-  /**
-   * Construct extra metadata for clustering group
-   */
-  private Map<String, String> constructExtraMetadata(String partition, 
List<ConsistentHashingNode> nodes, int seqNo) {
-    Map<String, String> extraMetadata = new HashMap<>();
-    try {
-      extraMetadata.put(METADATA_PARTITION_KEY, partition);
-      extraMetadata.put(METADATA_CHILD_NODE_KEY, 
ConsistentHashingNode.toJsonString(nodes));
-      extraMetadata.put(METADATA_SEQUENCE_NUMBER_KEY, Integer.toString(seqNo));
-    } catch (IOException e) {
-      LOG.error("Failed to construct extra metadata, partition: " + partition 
+ ", nodes:" + nodes);
-      throw new HoodieClusteringException("Failed to construct extra metadata, 
partition: " + partition + ", nodes:" + nodes);
-    }
-    return extraMetadata;
-  }
-
-  private long getSplitSize() {
-    HoodieFileFormat format = 
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
-    return (long) (getWriteConfig().getMaxFileSize(format) * 
getWriteConfig().getBucketSplitThreshold());
-  }
-
-  private long getMergeSize() {
-    HoodieFileFormat format = 
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
-    return (long) (getWriteConfig().getMaxFileSize(format) * 
getWriteConfig().getBucketMergeThreshold());
-  }
-
-  private void validate() {
-    ValidationUtils.checkArgument(getHoodieTable().getIndex() instanceof 
HoodieSparkConsistentBucketIndex,
-        "SparConsistentBucketClusteringPlanStrategy is only applicable to 
table with consistent hash index");
+    return super.buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
index 6469b3108a6..698136d9b20 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.client.clustering.run.strategy;
 
 import org.apache.hudi.client.WriteStatus;
-import 
org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.ConsistentHashingNode;
@@ -30,6 +29,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 import 
org.apache.hudi.execution.bulkinsert.RDDConsistentBucketBulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
+import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
 import org.apache.avro.Schema;
@@ -80,8 +80,8 @@ public class 
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
 
     RDDConsistentBucketBulkInsertPartitioner<T> partitioner = new 
RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams, 
preserveHoodieMetadata);
     try {
-      List<ConsistentHashingNode> nodes = 
ConsistentHashingNode.fromJsonString(extraMetadata.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
-      
partitioner.addHashingChildrenNodes(extraMetadata.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
 nodes);
+      List<ConsistentHashingNode> nodes = 
ConsistentHashingNode.fromJsonString(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
+      
partitioner.addHashingChildrenNodes(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
 nodes);
     } catch (Exception e) {
       LOG.error("Failed to add hashing children nodes", e);
       throw new HoodieClusteringException("Failed to add hashing children 
nodes", e);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 4ddacc7f3cb..7d13a5940b4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.update.strategy;
 
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
-import 
org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -40,6 +39,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
 import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
 import org.apache.hudi.table.HoodieTable;
+import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
 import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
 
 import org.slf4j.Logger;
@@ -121,7 +121,7 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T 
extends HoodieRecord
                                                         Map<String, 
HoodieConsistentHashingMetadata> partitionToHashingMeta, Map<String, String> 
partitionToInstant) {
     for (HoodieClusteringGroup group : plan.getInputGroups()) {
       Map<String, String> groupMeta = group.getExtraMetadata();
-      String p = 
groupMeta.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+      String p = 
groupMeta.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
       checkState(p != null, "Clustering plan does not has partition info, 
plan: " + plan);
       // Skip unrelated clustering group
       if (!recordPartitions.contains(p)) {
@@ -137,7 +137,7 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T 
extends HoodieRecord
       }
 
       try {
-        String nodeJson = 
group.getExtraMetadata().get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
+        String nodeJson = 
group.getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
         List<ConsistentHashingNode> nodes = 
ConsistentHashingNode.fromJsonString(nodeJson);
         partitionToHashingMeta.get(p).getChildrenNodes().addAll(nodes);
       } catch (Exception e) {
@@ -146,5 +146,4 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T 
extends HoodieRecord
       }
     }
   }
-
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
index b9a77e56f67..b49599b4c51 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
@@ -21,7 +21,6 @@ package org.apache.hudi.index.bucket;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import 
org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.ConsistentHashingNode;
@@ -36,6 +35,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.table.HoodieTable;
+import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -79,16 +79,16 @@ public class HoodieSparkConsistentBucketIndex extends 
HoodieConsistentBucketInde
 
     HoodieClusteringPlan plan = instantPlanPair.get().getRight();
     
HoodieJavaRDD.getJavaRDD(context.parallelize(plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList())))
-        .mapToPair(m -> new 
Tuple2<>(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
 m)
+        .mapToPair(m -> new 
Tuple2<>(m.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
 m)
     ).groupByKey().foreach((input) -> {
       // Process each partition
       String partition = input._1();
       List<ConsistentHashingNode> childNodes = new ArrayList<>();
       int seqNo = 0;
       for (Map<String, String> m: input._2()) {
-        String nodesJson = 
m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
+        String nodesJson = 
m.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
         childNodes.addAll(ConsistentHashingNode.fromJsonString(nodesJson));
-        seqNo = 
Integer.parseInt(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY));
+        seqNo = 
Integer.parseInt(m.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY));
       }
 
       Option<HoodieConsistentHashingMetadata> metadataOption = 
ConsistentBucketIndexUtils.loadMetadata(hoodieTable, partition);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
index 7c03a02f5fc..8760f63c9f1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
@@ -36,6 +36,7 @@ import 
org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
+import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 
 import org.junit.jupiter.api.AfterEach;
@@ -152,7 +153,7 @@ public class 
TestSparkConsistentBucketClusteringPlanStrategy extends HoodieClien
     Assertions.assertEquals(fileSlices.get(7).getFileId(), 
groups.get(0).getSlices().get(1).getFileId());
     Assertions.assertEquals(fileSlices.get(6).getFileId(), 
groups.get(0).getSlices().get(0).getFileId());
     Assertions.assertEquals(3, groups.get(0).getSlices().size());
-    List<ConsistentHashingNode> nodes = 
ConsistentHashingNode.fromJsonString(groups.get(0).getExtraMetadata().get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
+    List<ConsistentHashingNode> nodes = 
ConsistentHashingNode.fromJsonString(groups.get(0).getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
     Assertions.assertEquals(3, nodes.size());
     Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE, 
nodes.get(0).getTag());
     Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE, 
nodes.get(1).getTag());
@@ -163,7 +164,7 @@ public class 
TestSparkConsistentBucketClusteringPlanStrategy extends HoodieClien
     Assertions.assertEquals(fileSlices.get(2).getFileId(), 
groups.get(1).getSlices().get(0).getFileId());
     Assertions.assertEquals(fileSlices.get(3).getFileId(), 
groups.get(1).getSlices().get(1).getFileId());
     Assertions.assertEquals(2, groups.get(1).getSlices().size());
-    nodes = 
ConsistentHashingNode.fromJsonString(groups.get(1).getExtraMetadata().get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
+    nodes = 
ConsistentHashingNode.fromJsonString(groups.get(1).getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
     Assertions.assertEquals(2, nodes.size());
     Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE, 
nodes.get(0).getTag());
     Assertions.assertEquals(ConsistentHashingNode.NodeTag.REPLACE, 
nodes.get(1).getTag());
@@ -192,5 +193,4 @@ public class 
TestSparkConsistentBucketClusteringPlanStrategy extends HoodieClien
 
     return fs;
   }
-
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 4b90ab4c73a..e7a6901b0b2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -140,6 +140,10 @@ public class OptionsResolver {
     return HoodieIndex.BucketIndexEngineType.valueOf(bucketEngineType);
   }
 
+  public static boolean isConsistentHashingBucketIndexType(Configuration conf) 
{
+    return isBucketIndexType(conf) && 
getBucketEngineType(conf).equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING);
+  }
+
   /**
    * Returns whether the source should emit changelog.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 474e2b18e60..900ccc77670 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -109,7 +109,7 @@ public class Pipelines {
     WriteOperatorFactory<RowData> operatorFactory = 
BulkInsertWriteOperator.getFactory(conf, rowType);
     if (OptionsResolver.isBucketIndexType(conf)) {
       // TODO support bulk insert for consistent bucket index
-      if (OptionsResolver.getBucketEngineType(conf) == 
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING) {
+      if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
         throw new HoodieException(
             "Consistent hashing bucket index does not work with bulk insert 
using FLINK engine. Use simple bucket index or Spark engine.");
       }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index b183ba3a4b0..2d47bb8a1b4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -25,9 +25,13 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieFlinkTable;
+import 
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
 
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
@@ -46,7 +50,19 @@ public class ClusteringUtil {
 
   public static void validateClusteringScheduling(Configuration conf) {
     if (OptionsResolver.isBucketIndexType(conf)) {
-      throw new UnsupportedOperationException("Clustering is not supported for 
bucket index.");
+      HoodieIndex.BucketIndexEngineType bucketIndexEngineType = 
OptionsResolver.getBucketEngineType(conf);
+      switch (bucketIndexEngineType) {
+        case SIMPLE:
+          throw new UnsupportedOperationException("Clustering is not supported 
for simple bucket index.");
+        case CONSISTENT_HASHING:
+          if 
(!conf.get(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS).equalsIgnoreCase(FlinkConsistentBucketClusteringPlanStrategy.class.getName()))
 {
+            throw new HoodieNotSupportedException(
+                "CLUSTERING_PLAN_STRATEGY_CLASS should be set to " + 
FlinkConsistentBucketClusteringPlanStrategy.class.getName() + " in order to 
work with Consistent Hashing Bucket Index.");
+          }
+          break;
+        default:
+          throw new HoodieNotSupportedException("Unknown bucket index engine 
type: " + bucketIndexEngineType);
+      }
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index c039cbaacd8..1835e9c894b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -23,8 +23,8 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -40,6 +40,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import 
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -163,7 +164,7 @@ public class FlinkWriteClients {
             .withClusteringConfig(
                 HoodieClusteringConfig.newBuilder()
                     
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
-                    
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
+                    
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
 getDefaultPlanStrategyClassName(conf)))
                     .withClusteringPlanPartitionFilterMode(
                         
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
                     
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
@@ -245,4 +246,9 @@ public class FlinkWriteClients {
     }
     return writeConfig;
   }
+
+  private static String getDefaultPlanStrategyClassName(Configuration conf) {
+    return OptionsResolver.isConsistentHashingBucketIndexType(conf) ? 
FlinkConsistentBucketClusteringPlanStrategy.class.getName() :
+        FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.defaultValue();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 6cfa0abd218..5342d7b4192 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -160,6 +160,7 @@ public class StreamerUtil {
         
.withBucketNum(String.valueOf(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)))
         .withRecordKeyField(conf.getString(FlinkOptions.RECORD_KEY_FIELD))
         .withIndexKeyField(OptionsResolver.getIndexKeyField(conf))
+        .withBucketIndexEngineType(OptionsResolver.getBucketEngineType(conf))
         .withEngineType(EngineType.FLINK)
         .build();
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestFlinkConsistentHashingClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestFlinkConsistentHashingClustering.java
new file mode 100644
index 00000000000..e52fe8b976a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestFlinkConsistentHashingClustering.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import 
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestSQL;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ITTestFlinkConsistentHashingClustering {
+
+  private static final Map<String, String> EXPECTED_AFTER_INITIAL_INSERT = new 
HashMap<>();
+  private static final Map<String, String> EXPECTED_AFTER_UPSERT = new 
HashMap<>();
+
+  static {
+    EXPECTED_AFTER_INITIAL_INSERT.put("", "id1,,id1,Danny,23,1000,, 
id2,,id2,Stephen,33,2000,, "
+        + "id3,,id3,Julian,53,3000,, id4,,id4,Fabian,31,4000,, 
id5,,id5,Sophia,18,5000,, "
+        + "id6,,id6,Emma,20,6000,, id7,,id7,Bob,44,7000,, 
id8,,id8,Han,56,8000,, ]");
+    EXPECTED_AFTER_UPSERT.put("", "[id1,,id1,Danny,24,1000,, 
id2,,id2,Stephen,34,2000,, id3,,id3,Julian,54,3000,, "
+        + "id4,,id4,Fabian,32,4000,, id5,,id5,Sophia,18,5000,, 
id6,,id6,Emma,20,6000,, "
+        + "id7,,id7,Bob,44,7000,, id8,,id8,Han,56,8000,, 
id9,,id9,Jane,19,6000,, "
+        + "id10,,id10,Ella,38,7000,, id11,,id11,Phoebe,52,8000,,]");
+  }
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  public void testScheduleSplitPlan() throws Exception {
+    TableEnvironment tableEnv = setupTableEnv();
+    prepareData(tableEnv);
+
+    Configuration conf = getDefaultConfiguration();
+    conf.setString(HoodieIndexConfig.BUCKET_INDEX_MIN_NUM_BUCKETS.key(), "4");
+    conf.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
+    // Manually set the split threshold to trigger split in the clustering
+    conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    conf.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(), 
String.valueOf(1 / 1024.0 / 1024.0));
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
+    Option<String> clusteringInstantOption = 
writeClient.scheduleClustering(Option.empty());
+    Assertions.assertTrue(clusteringInstantOption.isPresent());
+
+    // Validate clustering plan
+    HoodieClusteringPlan clusteringPlan = getLatestClusteringPlan(writeClient);
+    Assertions.assertEquals(4, clusteringPlan.getInputGroups().size());
+    Assertions.assertEquals(1, 
clusteringPlan.getInputGroups().get(0).getSlices().size());
+    Assertions.assertEquals(1, 
clusteringPlan.getInputGroups().get(1).getSlices().size());
+    Assertions.assertEquals(1, 
clusteringPlan.getInputGroups().get(2).getSlices().size());
+    Assertions.assertEquals(1, 
clusteringPlan.getInputGroups().get(3).getSlices().size());
+  }
+
+  @Test
+  public void testScheduleMergePlan() throws Exception {
+    TableEnvironment tableEnv = setupTableEnv();
+    prepareData(tableEnv);
+
+    Configuration conf = getDefaultConfiguration();
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
+    Option<String> clusteringInstantOption = 
writeClient.scheduleClustering(Option.empty());
+    Assertions.assertFalse(clusteringInstantOption.isPresent());
+  }
+
+  private HoodieClusteringPlan getLatestClusteringPlan(HoodieFlinkWriteClient 
writeClient) {
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+    table.getMetaClient().reloadActiveTimeline();
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = 
ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), 
table.getMetaClient().getActiveTimeline().filterPendingReplaceTimeline().lastInstant().get());
+    return clusteringPlanOption.get().getRight();
+  }
+
+  private void prepareData(TableEnvironment tableEnv) throws Exception {
+    // Insert initial data
+    Map<String, String> options = getDefaultConsistentHashingOption();
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options, false, "");
+    tableEnv.executeSql(hoodieTableDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+    TimeUnit.SECONDS.sleep(3);
+
+    // Validate the insertion
+    TestData.checkWrittenData(tempFile, EXPECTED_AFTER_INITIAL_INSERT, 0);
+  }
+
+  private TableEnvironment setupTableEnv() {
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+    return tableEnv;
+  }
+
+  private Configuration getDefaultConfiguration() throws Exception {
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
+    for (Map.Entry<String, String> e : 
getDefaultConsistentHashingOption().entrySet()) {
+      conf.setString(e.getKey(), e.getValue());
+    }
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    return conf;
+  }
+
+  private Map<String, String> getDefaultConsistentHashingOption() {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
+    options.put(FlinkOptions.OPERATION.key(), 
WriteOperationType.UPSERT.name());
+    options.put(FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.BUCKET.name());
+    options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), 
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
+    options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
+    options.put(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.key(), 
FlinkConsistentBucketClusteringPlanStrategy.class.getName());
+    // Flink currently only support schedule, and the clustering execution 
have to be done by Spark engine.
+    options.put(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key(), 
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy");
+
+    // Disable compaction/clustering by default
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+    options.put(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED.key(), "false");
+
+    return options;
+  }
+}

Reply via email to