This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c20db99a7b [HUDI-2207] Support independent flink hudi clustering 
function
     new 18635b533e Merge pull request #3599 from yuzhaojing/HUDI-2207
c20db99a7b is described below

commit c20db99a7b79b9545e6fae0d762250ceb55a8b79
Author: 喻兆靖 <[email protected]>
AuthorDate: Sat May 21 21:25:15 2022 +0800

    [HUDI-2207] Support independent flink hudi clustering function
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   2 +-
 .../apache/hudi/config/HoodieClusteringConfig.java |   2 +
 .../apache/hudi/table/BulkInsertPartitioner.java   |  16 +-
 .../cluster/strategy/ClusteringPlanStrategy.java   |  11 +
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  68 +++++
 .../FlinkRecentDaysClusteringPlanStrategy.java     |  65 +++++
 ...nkSelectedPartitionsClusteringPlanStrategy.java |  67 +++++
 .../FlinkSizeBasedClusteringPlanStrategy.java      | 129 +++++++++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   3 +-
 .../JavaCustomColumnsSortPartitioner.java          |   2 +-
 .../bulkinsert/JavaGlobalSortPartitioner.java      |   2 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  67 +++++
 .../hudi/sink/bulk/sort/SortOperatorGen.java       |   2 +-
 .../sink/clustering/ClusteringCommitEvent.java     |  77 +++++
 .../hudi/sink/clustering/ClusteringCommitSink.java | 174 +++++++++++
 .../hudi/sink/clustering/ClusteringOperator.java   | 318 +++++++++++++++++++++
 .../hudi/sink/clustering/ClusteringPlanEvent.java  |  73 +++++
 .../clustering/ClusteringPlanSourceFunction.java   |  91 ++++++
 .../sink/clustering/FlinkClusteringConfig.java     | 148 ++++++++++
 .../sink/clustering/HoodieFlinkClusteringJob.java  | 191 +++++++++++++
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   2 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  20 +-
 .../sink/cluster/ITTestHoodieFlinkClustering.java  | 184 ++++++++++++
 23 files changed, 1700 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 270027df18..251ff97799 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1379,7 +1379,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     return scheduleClustering(extraMetadata);
   }
 
-  protected void rollbackInflightClustering(HoodieInstant inflightInstant, 
HoodieTable table) {
+  public void rollbackInflightClustering(HoodieInstant inflightInstant, 
HoodieTable table) {
     Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = 
getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), 
false);
     String commitTime = pendingRollbackInstantInfo.map(entry -> 
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
     table.scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index eee6f4f492..1180845a6e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -51,6 +51,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
   public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = 
"hoodie.clustering.plan.strategy.";
   public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
       
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
+  public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
+      
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
   public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
       
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
   public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index 63b502531a..89360c2474 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -25,20 +25,20 @@ import org.apache.hudi.io.WriteHandleFactory;
 import java.io.Serializable;
 
 /**
- * Repartition input records into at least expected number of output spark 
partitions. It should give below guarantees -
- * Output spark partition will have records from only one hoodie partition. - 
Average records per output spark
- * partitions should be almost equal to (#inputRecords / 
#outputSparkPartitions) to avoid possible skews.
+ * Repartition input records into at least expected number of output 
partitions. It should give below guarantees -
+ * Output partition will have records from only one hoodie partition. - 
Average records per output
+ * partitions should be almost equal to (#inputRecords / #outputPartitions) to 
avoid possible skews.
  */
 public interface BulkInsertPartitioner<I> extends Serializable {
 
   /**
-   * Repartitions the input records into at least expected number of output 
spark partitions.
+   * Repartitions the input records into at least expected number of output 
partitions.
    *
-   * @param records               Input Hoodie records
-   * @param outputSparkPartitions Expected number of output partitions
+   * @param records          Input Hoodie records
+   * @param outputPartitions Expected number of output partitions
    * @return
    */
-  I repartitionRecords(I records, int outputSparkPartitions);
+  I repartitionRecords(I records, int outputPartitions);
 
   /**
    * @return {@code true} if the records within a partition are sorted; {@code 
false} otherwise.
@@ -48,6 +48,7 @@ public interface BulkInsertPartitioner<I> extends 
Serializable {
   /**
    * Return file group id prefix for the given data partition.
    * By defauult, return a new file group id prefix, so that incoming records 
will route to a fresh new file group
+   *
    * @param partitionId data partition
    * @return
    */
@@ -57,6 +58,7 @@ public interface BulkInsertPartitioner<I> extends 
Serializable {
 
   /**
    * Return write handle factory for the given partition.
+   *
    * @param partitionId data partition
    * @return
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 479f63932c..a96ff73947 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -70,6 +70,9 @@ public abstract class ClusteringPlanStrategy<T extends 
HoodieRecordPayload,I,K,O
     String sparkSizeBasedClassName = 
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
     String sparkSelectedPartitionsClassName = 
"org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
     String sparkRecentDaysClassName = 
"org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
+    String flinkSizeBasedClassName = 
HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+    String flinkSelectedPartitionsClassName = 
"org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
+    String flinkRecentDaysClassName = 
"org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
     String javaSelectedPartitionClassName = 
"org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
     String javaSizeBasedClassName = 
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
 
@@ -82,6 +85,14 @@ public abstract class ClusteringPlanStrategy<T extends 
HoodieRecordPayload,I,K,O
       config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
       LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, 
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
       return sparkSizeBasedClassName;
+    } else if (flinkRecentDaysClassName.equals(className)) {
+      config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, 
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
+      LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, 
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), 
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
+      return flinkSizeBasedClassName;
+    } else if (flinkSelectedPartitionsClassName.equals(className)) {
+      config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+      LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, 
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
+      return flinkSizeBasedClassName;
     } else if (javaSelectedPartitionClassName.equals(className)) {
       config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, 
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
       LOG.warn(String.format(logStr, className, javaSizeBasedClassName, 
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 49fa2ec246..ddfbabaf36 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -29,8 +29,10 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -39,6 +41,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
@@ -68,6 +71,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -399,6 +404,52 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     throw new HoodieNotSupportedException("Clustering is not supported yet");
   }
 
+  private void completeClustering(
+      HoodieReplaceCommitMetadata metadata,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
+      String clusteringCommitTime) {
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
clustering write status and commit clustering");
+    HoodieInstant clusteringInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
+    List<HoodieWriteStat> writeStats = 
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+        e.getValue().stream()).collect(Collectors.toList());
+    if 
(writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) 
{
+      throw new HoodieClusteringException("Clustering failed to write to 
files:"
+          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 
0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
+    }
+
+    try {
+      this.txnManager.beginTransaction(Option.of(clusteringInstant), 
Option.empty());
+      finalizeWrite(table, clusteringCommitTime, writeStats);
+      // commit to data table after committing to metadata table.
+      // Do not do any conflict resolution here as we do with regular writes. 
We take the lock here to ensure all writes to metadata table happens within a
+      // single lock (single writer). Because more than one write to metadata 
table will result in conflicts since all of them updates the same partition.
+      writeTableMetadata(table, clusteringCommitTime, 
clusteringInstant.getAction(), metadata);
+      LOG.info("Committing Clustering {} finished with result {}.", 
clusteringCommitTime, metadata);
+      table.getActiveTimeline().transitionReplaceInflightToComplete(
+          HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    } catch (IOException e) {
+      throw new HoodieClusteringException(
+          "Failed to commit " + table.getMetaClient().getBasePath() + " at 
time " + clusteringCommitTime, e);
+    } finally {
+      this.txnManager.endTransaction(Option.of(clusteringInstant));
+    }
+
+    WriteMarkersFactory.get(config.getMarkersType(), table, 
clusteringCommitTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+    if (clusteringTimer != null) {
+      long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
+      try {
+        
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
+            durationInMs, metadata, 
HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+      } catch (ParseException e) {
+        throw new HoodieCommitException("Commit time is not of valid format. 
Failed to commit compaction "
+            + config.getBasePath() + " at time " + clusteringCommitTime, e);
+      }
+    }
+    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+  }
+
   @Override
   protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
     // Create a Hoodie table which encapsulated the commits and files visible
@@ -412,6 +463,23 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     // no need to execute the upgrade/downgrade on each write in streaming.
   }
 
+  public void completeTableService(
+      TableServiceType tableServiceType,
+      HoodieCommitMetadata metadata,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
+      String commitInstant) {
+    switch (tableServiceType) {
+      case CLUSTER:
+        completeClustering((HoodieReplaceCommitMetadata) metadata, table, 
commitInstant);
+        break;
+      case COMPACT:
+        completeCompaction(metadata, table, commitInstant);
+        break;
+      default:
+        throw new IllegalArgumentException("This table service is not valid " 
+ tableServiceType);
+    }
+  }
+
   /**
    * Upgrade downgrade the Hoodie table.
    *
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
new file mode 100644
index 0000000000..0109aaa60f
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Only looks at latest 'daybased.lookback.partitions' partitions.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering 
plan.
+ */
+public class FlinkRecentDaysClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+    extends FlinkSizeBasedClusteringPlanStrategy<T> {
+  private static final Logger LOG = 
LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class);
+
+  public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> 
table,
+                                               HoodieFlinkEngineContext 
engineContext,
+                                               HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> 
table,
+                                               HoodieFlinkEngineContext 
engineContext,
+                                               HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+    int targetPartitionsForClustering = 
getWriteConfig().getTargetPartitionsForClustering();
+    int skipPartitionsFromLatestForClustering = 
getWriteConfig().getSkipPartitionsFromLatestForClustering();
+    return partitionPaths.stream()
+        .sorted(Comparator.reverseOrder())
+        .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
+        .limit(targetPartitionsForClustering > 0 ? 
targetPartitionsForClustering : partitionPaths.size())
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java
new file mode 100644
index 0000000000..ae5726bb4a
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
+
+/**
+ * Clustering Strategy to filter just specified partitions from [begin, end]. 
Note both begin and end are inclusive.
+ */
+public class FlinkSelectedPartitionsClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+    extends FlinkSizeBasedClusteringPlanStrategy<T> {
+  private static final Logger LOG = 
LogManager.getLogger(FlinkSelectedPartitionsClusteringPlanStrategy.class);
+
+  public static final String CONF_BEGIN_PARTITION = 
CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
+  public static final String CONF_END_PARTITION = 
CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
+  
+  public 
FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> 
table,
+                                                       
HoodieFlinkEngineContext engineContext,
+                                                       HoodieWriteConfig 
writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  public 
FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> 
table,
+                                                       
HoodieFlinkEngineContext engineContext,
+                                                       HoodieWriteConfig 
writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+    String beginPartition = 
getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
+    String endPartition = 
getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
+    List<String> filteredPartitions = partitionPaths.stream()
+        .filter(path -> path.compareTo(beginPartition) >= 0 && 
path.compareTo(endPartition) <= 0)
+        .collect(Collectors.toList());
+    LOG.info("Filtered to the following partitions: " + filteredPartitions);
+    return filteredPartitions;
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 0000000000..8347da6014
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Creates clustering groups based on max size allowed per group.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering 
plan.
+ */
+public class FlinkSizeBasedClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+    extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
+  private static final Logger LOG = 
LogManager.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
+
+  public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> 
table,
+                                              HoodieFlinkEngineContext 
engineContext,
+                                              HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> 
table,
+                                              HoodieFlinkEngineContext 
engineContext,
+                                              HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+
+    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+    List<FileSlice> currentGroup = new ArrayList<>();
+    long totalSizeSoFar = 0;
+
+    for (FileSlice currentSlice : fileSlices) {
+      // check if max size is reached and create new group, if needed.
+      // in now, every clustering group out put is 1 file group.
+      if (totalSizeSoFar >= writeConfig.getClusteringTargetFileMaxBytes() && 
!currentGroup.isEmpty()) {
+        LOG.info("Adding one clustering group " + totalSizeSoFar + " max 
bytes: "
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size());
+        fileSliceGroups.add(Pair.of(currentGroup, 1));
+        currentGroup = new ArrayList<>();
+        totalSizeSoFar = 0;
+      }
+
+      // Add to the current file-group
+      currentGroup.add(currentSlice);
+      // assume each file group size is ~= parquet.max.file.size
+      totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
+    }
+
+    if (!currentGroup.isEmpty()) {
+      fileSliceGroups.add(Pair.of(currentGroup, 1));
+    }
+
+    return fileSliceGroups.stream().map(fileSliceGroup ->
+        HoodieClusteringGroup.newBuilder()
+            .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+            .setNumOutputFileGroups(fileSliceGroup.getRight())
+            .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+            .build());
+  }
+
+  @Override
+  protected Map<String, String> getStrategyParams() {
+    Map<String, String> params = new HashMap<>();
+    if 
(!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
+      params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), 
getWriteConfig().getClusteringSortColumns());
+    }
+    return params;
+  }
+
+  @Override
+  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+    return partitionPaths;
+  }
+
+  @Override
+  protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String 
partition) {
+    return super.getFileSlicesEligibleForClustering(partition)
+        // Only files that have basefile size smaller than small file size are 
eligible.
+        .filter(slice -> 
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < 
getWriteConfig().getClusteringSmallFileLimit());
+  }
+
+  private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) 
{
+    return (int) Math.ceil(groupSize / (double) targetFileSize);
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 9ab633f9e3..0e5f1c26e3 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -55,6 +55,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
+import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
@@ -286,7 +287,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload>
 
   @Override
   public Option<HoodieClusteringPlan> scheduleClustering(final 
HoodieEngineContext context, final String instantTime, final Option<Map<String, 
String>> extraMetadata) {
-    throw new HoodieNotSupportedException("Clustering is not supported on a 
Flink CopyOnWrite table");
+    return new ClusteringPlanActionExecutor<>(context, config,this, 
instantTime, extraMetadata).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index eb3d4ef312..b9e466485f 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -49,7 +49,7 @@ public class JavaCustomColumnsSortPartitioner<T extends 
HoodieRecordPayload>
 
   @Override
   public List<HoodieRecord<T>> repartitionRecords(
-      List<HoodieRecord<T>> records, int outputSparkPartitions) {
+      List<HoodieRecord<T>> records, int outputPartitions) {
     return records.stream().sorted((o1, o2) -> {
       Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, 
sortColumnNames, schema, consistentLogicalTimestampEnabled);
       Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, 
sortColumnNames, schema, consistentLogicalTimestampEnabled);
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
index fded0ffab5..d272849a19 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
@@ -37,7 +37,7 @@ public class JavaGlobalSortPartitioner<T extends 
HoodieRecordPayload>
 
   @Override
   public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> 
records,
-                                                  int outputSparkPartitions) {
+                                                  int outputPartitions) {
     // Now, sort the records and line them up nicely for loading.
     records.sort(new Comparator() {
       @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 729f0147b5..3de4bd4f75 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.configuration;
 
+import 
org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy;
 import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.HoodieConfig;
@@ -583,6 +584,72 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(40)// default min 40 commits
       .withDescription("Min number of commits to keep before archiving older 
commits into a sequential log, default 40");
 
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+
+  public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = 
ConfigOptions
+      .key("clustering.schedule.enabled")
+      .booleanType()
+      .defaultValue(false) // default false for pipeline
+      .withDescription("Schedule the cluster plan, default false");
+
+  public static final ConfigOption<Integer> CLUSTERING_DELTA_COMMITS = 
ConfigOptions
+      .key("clustering.delta_commits")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Max delta commits needed to trigger clustering, 
default 4 commits");
+
+  public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+      .key("clustering.tasks")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Parallelism of tasks that do actual clustering, 
default is 4");
+
+  public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = 
ConfigOptions
+      .key("clustering.plan.strategy.daybased.lookback.partitions")
+      .intType()
+      .defaultValue(2)
+      .withDescription("Number of partitions to list to create ClusteringPlan, 
default is 2");
+
+  public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = 
ConfigOptions
+      .key("clustering.plan.strategy.class")
+      .stringType()
+      .defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName())
+      .withDescription("Config to provide a strategy class (subclass of 
ClusteringPlanStrategy) to create clustering plan "
+          + "i.e select what file groups are being clustered. Default 
strategy, looks at the last N (determined by "
+          + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks 
the small file slices within those partitions.");
+
+  public static final ConfigOption<Integer> 
CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
+      .key("clustering.plan.strategy.target.file.max.bytes")
+      .intType()
+      .defaultValue(1024 * 1024 * 1024) // default 1 GB
+      .withDescription("Each group can produce 'N' 
(CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, 
default 1 GB");
+
+  public static final ConfigOption<Integer> 
CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
+      .key("clustering.plan.strategy.small.file.limit")
+      .intType()
+      .defaultValue(600) // default 600 MB
+      .withDescription("Files smaller than the size specified here are 
candidates for clustering, default 600 MB");
+
+  public static final ConfigOption<Integer> 
CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
+      .key("clustering.plan.strategy.daybased.skipfromlatest.partitions")
+      .intType()
+      .defaultValue(0)
+      .withDescription("Number of partitions to skip from latest when choosing 
partitions to create ClusteringPlan");
+
+  public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = 
ConfigOptions
+      .key("clustering.plan.strategy.sort.columns")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Columns to sort the data by when clustering");
+
+  public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = 
ConfigOptions
+      .key("clustering.plan.strategy.max.num.groups")
+      .intType()
+      .defaultValue(30)
+      .withDescription("Maximum number of groups to create as part of 
ClusteringPlan. Increasing groups will increase parallelism, default is 30");
+
   // ------------------------------------------------------------------------
   //  Hive Sync Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
index 4d3fc08efe..b5599886a9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
@@ -48,7 +48,7 @@ public class SortOperatorGen {
         codeGen.generateRecordComparator("SortComparator"));
   }
 
-  private SortCodeGenerator createSortCodeGenerator() {
+  public SortCodeGenerator createSortCodeGenerator() {
     SortSpec.SortSpecBuilder builder = SortSpec.builder();
     IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, 
true, true));
     return new SortCodeGenerator(tableConfig, rowType, builder.build());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
new file mode 100644
index 0000000000..30a8fbed3f
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.client.WriteStatus;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Represents a commit event from the clustering task {@link 
ClusteringFunction}.
+ */
+public class ClusteringCommitEvent implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The clustering commit instant time.
+   */
+  private String instant;
+  /**
+   * The write statuses.
+   */
+  private List<WriteStatus> writeStatuses;
+  /**
+   * The clustering task identifier.
+   */
+  private int taskID;
+
+  public ClusteringCommitEvent() {
+  }
+
+  public ClusteringCommitEvent(String instant, List<WriteStatus> 
writeStatuses, int taskID) {
+    this.instant = instant;
+    this.writeStatuses = writeStatuses;
+    this.taskID = taskID;
+  }
+
+  public void setInstant(String instant) {
+    this.instant = instant;
+  }
+
+  public void setWriteStatuses(List<WriteStatus> writeStatuses) {
+    this.writeStatuses = writeStatuses;
+  }
+
+  public void setTaskID(int taskID) {
+    this.taskID = taskID;
+  }
+
+  public String getInstant() {
+    return instant;
+  }
+
+  public List<WriteStatus> getWriteStatuses() {
+    return writeStatuses;
+  }
+
+  public int getTaskID() {
+    return taskID;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
new file mode 100644
index 0000000000..bc87270a49
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.sink.CleanFunction;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Function to check and commit the clustering action.
+ *
+ * <p> Each time after receiving a clustering commit event {@link 
ClusteringCommitEvent},
+ * it loads and checks the clustering plan {@link 
org.apache.hudi.avro.model.HoodieClusteringPlan},
+ * if all the clustering operations {@link 
org.apache.hudi.common.model.ClusteringOperation}
+ * of the plan are finished, tries to commit the clustering action.
+ *
+ * <p>It also inherits the {@link CleanFunction} cleaning ability. This is 
needed because
+ * the SQL API does not allow multiple sinks in one table sink provider.
+ */
+public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClusteringCommitSink.class);
+
+  /**
+   * Config options.
+   */
+  private final Configuration conf;
+
+  private transient HoodieFlinkTable<?> table;
+
+  /**
+   * Buffer to collect the event from each clustering task {@code 
ClusteringFunction}.
+   * The key is the instant time.
+   */
+  private transient Map<String, List<ClusteringCommitEvent>> commitBuffer;
+
+  public ClusteringCommitSink(Configuration conf) {
+    super(conf);
+    this.conf = conf;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    if (writeClient == null) {
+      this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+    }
+    this.commitBuffer = new HashMap<>();
+    this.table = writeClient.getHoodieTable();
+  }
+
+  @Override
+  public void invoke(ClusteringCommitEvent event, Context context) throws 
Exception {
+    final String instant = event.getInstant();
+    commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>())
+        .add(event);
+    commitIfNecessary(instant, commitBuffer.get(instant));
+  }
+
+  /**
+   * Condition to commit: the commit buffer has equal size with the clustering 
plan operations
+   * and all the clustering commit event {@link ClusteringCommitEvent} has the 
same clustering instant time.
+   *
+   * @param instant Clustering commit instant time
+   * @param events  Commit events ever received for the instant
+   */
+  private void commitIfNecessary(String instant, List<ClusteringCommitEvent> 
events) {
+    HoodieInstant clusteringInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(instant);
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = 
ClusteringUtils.getClusteringPlan(
+        StreamerUtil.createMetaClient(this.conf), clusteringInstant);
+    HoodieClusteringPlan clusteringPlan = 
clusteringPlanOption.get().getRight();
+    boolean isReady = clusteringPlan.getInputGroups().size() == events.size();
+    if (!isReady) {
+      return;
+    }
+    List<WriteStatus> statuses = events.stream()
+        .map(ClusteringCommitEvent::getWriteStatuses)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+
+    HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new 
HoodieWriteMetadata<>();
+    writeMetadata.setWriteStatuses(statuses);
+    
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+    
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan,
 writeMetadata));
+    validateWriteResult(clusteringPlan, instant, writeMetadata);
+    if (!writeMetadata.getCommitMetadata().isPresent()) {
+      HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(
+          writeMetadata.getWriteStats().get(),
+          writeMetadata.getPartitionToReplaceFileIds(),
+          Option.empty(),
+          WriteOperationType.CLUSTER,
+          this.writeClient.getConfig().getSchema(),
+          HoodieTimeline.REPLACE_COMMIT_ACTION);
+      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
+    }
+    // commit the clustering
+    this.table.getMetaClient().reloadActiveTimeline();
+    this.writeClient.completeTableService(
+        TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), 
table, instant);
+
+    // reset the status
+    reset(instant);
+  }
+
+  private void reset(String instant) {
+    this.commitBuffer.remove(instant);
+  }
+
+  /**
+   * Validate actions taken by clustering. In the first implementation, we 
validate at least one new file is written.
+   * But we can extend this to add more validation. E.g. number of records 
read = number of records written etc.
+   * We can also make these validations in BaseCommitActionExecutor to reuse 
pre-commit hooks for multiple actions.
+   */
+  private static void validateWriteResult(HoodieClusteringPlan clusteringPlan, 
String instantTime, HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
+    if (writeMetadata.getWriteStatuses().isEmpty()) {
+      throw new HoodieClusteringException("Clustering plan produced 0 
WriteStatus for " + instantTime
+          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected 
at least "
+          + 
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+          + " write statuses");
+    }
+  }
+
+  private static Map<String, List<String>> getPartitionToReplacedFileIds(
+      HoodieClusteringPlan clusteringPlan,
+      HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
+    Set<HoodieFileGroupId> newFilesWritten = 
writeMetadata.getWriteStats().get().stream()
+        .map(s -> new HoodieFileGroupId(s.getPartitionPath(), 
s.getFileId())).collect(Collectors.toSet());
+    return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
+        .filter(fg -> !newFilesWritten.contains(fg))
+        .collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, 
Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
new file mode 100644
index 0000000000..a415ac9d46
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Operator to execute the actual clustering task assigned by the clustering 
plan task.
+ * In order to execute scalable, the input should shuffle by the clustering 
event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEvent> implements
+    OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>, 
BoundedOneInput {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClusteringOperator.class);
+
+  private final Configuration conf;
+  private final RowType rowType;
+  private int taskID;
+  private transient HoodieWriteConfig writeConfig;
+  private transient HoodieFlinkTable<?> table;
+  private transient Schema schema;
+  private transient Schema readerSchema;
+  private transient int[] requiredPos;
+  private transient AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+  private transient HoodieFlinkWriteClient writeClient;
+  private transient BulkInsertWriterHelper writerHelper;
+  private transient String instantTime;
+
+  private transient BinaryExternalSorter sorter;
+  private transient StreamRecordCollector<ClusteringCommitEvent> collector;
+  private transient BinaryRowDataSerializer binarySerializer;
+
+  public ClusteringOperator(Configuration conf, RowType rowType) {
+    this.conf = conf;
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+    this.table = writeClient.getHoodieTable();
+
+    this.schema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), 
false);
+    this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+    this.requiredPos = getRequiredPositions();
+
+    this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(rowType);
+
+    ClassLoader cl = getContainingTask().getUserCodeClassLoader();
+
+    AbstractRowDataSerializer inputSerializer = new 
BinaryRowDataSerializer(rowType.getFieldCount());
+    this.binarySerializer = new 
BinaryRowDataSerializer(inputSerializer.getArity());
+
+    NormalizedKeyComputer computer = 
createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
+    RecordComparator comparator = 
createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
+
+    MemoryManager memManager = 
getContainingTask().getEnvironment().getMemoryManager();
+    this.sorter =
+        new BinaryExternalSorter(
+            this.getContainingTask(),
+            memManager,
+            computeMemorySize(),
+            this.getContainingTask().getEnvironment().getIOManager(),
+            inputSerializer,
+            binarySerializer,
+            computer,
+            comparator,
+            getContainingTask().getJobConfiguration());
+    this.sorter.startThreads();
+
+    collector = new StreamRecordCollector<>(output);
+
+    // register the metrics.
+    getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) 
sorter::getUsedMemoryInBytes);
+    getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) 
sorter::getNumSpillFiles);
+    getMetricGroup().gauge("spillInBytes", (Gauge<Long>) 
sorter::getSpillInBytes);
+  }
+
+  @Override
+  public void processElement(StreamRecord<ClusteringPlanEvent> element) throws 
Exception {
+    ClusteringPlanEvent event = element.getValue();
+    final String instantTime = event.getClusteringInstantTime();
+    final ClusteringGroupInfo clusteringGroupInfo = 
event.getClusteringGroupInfo();
+
+    initWriterHelper(instantTime);
+
+    List<ClusteringOperation> clusteringOps = 
clusteringGroupInfo.getOperations();
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+
+    Iterator<RowData> iterator;
+    if (hasLogFiles) {
+      // if there are log files, we read all records into memory for a file 
group and apply updates.
+      iterator = readRecordsForGroupWithLogs(clusteringOps, instantTime);
+    } else {
+      // We want to optimize reading records for case there are no log files.
+      iterator = readRecordsForGroupBaseFiles(clusteringOps);
+    }
+
+    RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
+    while (iterator.hasNext()) {
+      RowData rowData = iterator.next();
+      BinaryRowData binaryRowData = 
rowDataSerializer.toBinaryRow(rowData).copy();
+      this.sorter.write(binaryRowData);
+    }
+
+    BinaryRowData row = binarySerializer.createInstance();
+    while ((row = sorter.getIterator().next(row)) != null) {
+      this.writerHelper.write(row);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (this.writeClient != null) {
+      this.writeClient.cleanHandlesGracefully();
+      this.writeClient.close();
+    }
+  }
+
+  /**
+   * End input action for batch source.
+   */
+  public void endInput() {
+    List<WriteStatus> writeStatuses = 
this.writerHelper.getWriteStatuses(this.taskID);
+    collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, 
this.taskID));
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void initWriterHelper(String clusteringInstantTime) {
+    if (this.writerHelper == null) {
+      this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, 
this.writeConfig,
+          clusteringInstantTime, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
+          this.rowType);
+      this.instantTime = clusteringInstantTime;
+    }
+  }
+
+  /**
+   * Read records from baseFiles, apply updates and convert to Iterator.
+   */
+  @SuppressWarnings("unchecked")
+  private Iterator<RowData> 
readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String 
instantTime) {
+    List<Iterator<RowData>> recordIterators = new ArrayList<>();
+
+    long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new 
FlinkTaskContextSupplier(null), writeConfig);
+    LOG.info("MaxMemoryPerCompaction run as part of clustering => " + 
maxMemoryPerCompaction);
+
+    for (ClusteringOperation clusteringOp : clusteringOps) {
+      try {
+        Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+            ? Option.empty()
+            : 
Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new 
Path(clusteringOp.getDataFilePath())));
+        HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+            .withFileSystem(table.getMetaClient().getFs())
+            .withBasePath(table.getMetaClient().getBasePath())
+            .withLogFilePaths(clusteringOp.getDeltaFilePaths())
+            .withReaderSchema(readerSchema)
+            .withLatestInstantTime(instantTime)
+            .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+            
.withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
+            
.withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
+            .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
+            .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
+            .build();
+
+        HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+        HoodieFileSliceReader<? extends IndexedRecord> hoodieFileSliceReader = 
HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
+            tableConfig.getPayloadClass(),
+            tableConfig.getPreCombineField(),
+            tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+                tableConfig.getPartitionFieldProp())));
+
+        
recordIterators.add(StreamSupport.stream(Spliterators.spliteratorUnknownSize(hoodieFileSliceReader,
 Spliterator.NONNULL), false).map(hoodieRecord -> {
+          try {
+            return this.transform((IndexedRecord) 
hoodieRecord.getData().getInsertValue(readerSchema).get());
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to read next record", e);
+          }
+        }).iterator());
+      } catch (IOException e) {
+        throw new HoodieClusteringException("Error reading input data for " + 
clusteringOp.getDataFilePath()
+            + " and " + clusteringOp.getDeltaFilePaths(), e);
+      }
+    }
+
+    return new ConcatenatingIterator<>(recordIterators);
+  }
+
+  /**
+   * Read records from baseFiles and get iterator.
+   */
+  private Iterator<RowData> 
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
+    List<Iterator<RowData>> iteratorsForPartition = 
clusteringOps.stream().map(clusteringOp -> {
+      Iterable<IndexedRecord> indexedRecords = () -> {
+        try {
+          return HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema);
+        } catch (IOException e) {
+          throw new HoodieClusteringException("Error reading input data for " 
+ clusteringOp.getDataFilePath()
+              + " and " + clusteringOp.getDeltaFilePaths(), e);
+        }
+      };
+
+      return StreamSupport.stream(indexedRecords.spliterator(), 
false).map(this::transform).iterator();
+    }).collect(Collectors.toList());
+
+    return new ConcatenatingIterator<>(iteratorsForPartition);
+  }
+
+  /**
+   * Transform IndexedRecord into HoodieRecord.
+   */
+  private RowData transform(IndexedRecord indexedRecord) {
+    GenericRecord record = buildAvroRecordBySchema(indexedRecord, schema, 
requiredPos, new GenericRecordBuilder(schema));
+    return (RowData) avroToRowDataConverter.convert(record);
+  }
+
+  private int[] getRequiredPositions() {
+    final List<String> fieldNames = 
readerSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+    return schema.getFields().stream()
+        .map(field -> fieldNames.indexOf(field.name()))
+        .mapToInt(i -> i)
+        .toArray();
+  }
+
+  private SortCodeGenerator createSortCodeGenerator() {
+    SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType,
+        conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(","));
+    return sortOperatorGen.createSortCodeGenerator();
+  }
+
+  @Override
+  public void setKeyContextElement(StreamRecord<ClusteringPlanEvent> record) 
throws Exception {
+    OneInputStreamOperator.super.setKeyContextElement(record);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java
new file mode 100644
index 0000000000..c82075877b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Represents a cluster command from the clustering plan task {@link 
ClusteringPlanSourceFunction}.
+ */
+public class ClusteringPlanEvent implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private String clusteringInstantTime;
+
+  private ClusteringGroupInfo clusteringGroupInfo;
+
+  private Map<String, String> strategyParams;
+
+  public ClusteringPlanEvent() {
+  }
+
+  public ClusteringPlanEvent(
+      String instantTime,
+      ClusteringGroupInfo clusteringGroupInfo,
+      Map<String, String> strategyParams) {
+    this.clusteringInstantTime = instantTime;
+    this.clusteringGroupInfo = clusteringGroupInfo;
+    this.strategyParams = strategyParams;
+  }
+
+  public void setClusteringInstantTime(String clusteringInstantTime) {
+    this.clusteringInstantTime = clusteringInstantTime;
+  }
+
+  public void setClusteringGroupInfo(ClusteringGroupInfo clusteringGroupInfo) {
+    this.clusteringGroupInfo = clusteringGroupInfo;
+  }
+
+  public void setStrategyParams(Map<String, String> strategyParams) {
+    this.strategyParams = strategyParams;
+  }
+
+  public String getClusteringInstantTime() {
+    return clusteringInstantTime;
+  }
+
+  public ClusteringGroupInfo getClusteringGroupInfo() {
+    return clusteringGroupInfo;
+  }
+
+  public Map<String, String> getStrategyParams() {
+    return strategyParams;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
new file mode 100644
index 0000000000..a3db2d41c8
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering source function.
+ *
+ * <P>This function read the clustering plan as {@link ClusteringOperation}s 
then assign the clustering task
+ * event {@link ClusteringPlanEvent} to downstream operators.
+ *
+ * <p>The clustering instant time is specified explicitly with strategies:
+ *
+ * <ul>
+ *   <li>If the timeline has no inflight instants,
+ *   use {@link 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
+ *   as the instant time;</li>
+ *   <li>If the timeline has inflight instants,
+ *   use the median instant time between [last complete instant time, earliest 
inflight instant time]
+ *   as the instant time.</li>
+ * </ul>
+ */
+public class ClusteringPlanSourceFunction extends AbstractRichFunction 
implements SourceFunction<ClusteringPlanEvent> {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(ClusteringPlanSourceFunction.class);
+
+  /**
+   * The clustering plan.
+   */
+  private final HoodieClusteringPlan clusteringPlan;
+
+  /**
+   * Hoodie instant.
+   */
+  private final HoodieInstant instant;
+
+  public ClusteringPlanSourceFunction(HoodieInstant instant, 
HoodieClusteringPlan clusteringPlan) {
+    this.instant = instant;
+    this.clusteringPlan = clusteringPlan;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws 
Exception {
+    for (HoodieClusteringGroup clusteringGroup : 
clusteringPlan.getInputGroups()) {
+      LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " 
files");
+      sourceContext.collect(new 
ClusteringPlanEvent(this.instant.getTimestamp(), 
ClusteringGroupInfo.create(clusteringGroup), 
clusteringPlan.getStrategy().getStrategyParams()));
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void cancel() {
+    // no operation
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
new file mode 100644
index 0000000000..e87a7d6752
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Configurations for Hoodie Flink clustering.
+ */
+public class FlinkClusteringConfig extends Configuration {
+
+  @Parameter(names = {"--help", "-h"}, help = true)
+  public Boolean help = false;
+
+  // ------------------------------------------------------------------------
+  //  Hudi Write Options
+  // ------------------------------------------------------------------------
+
+  @Parameter(names = {"--path"}, description = "Base path for the target 
hoodie table.", required = true)
+  public String path;
+
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+  @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta 
commits needed to trigger clustering, default 4 commits", required = false)
+  public Integer clusteringDeltaCommits = 1;
+
+  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of 
tasks that do actual clustering, default is -1", required = false)
+  public Integer clusteringTasks = -1;
+
+  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in 
MB for compaction spillable map, default 100MB.", required = false)
+  public Integer compactionMaxMemory = 100;
+
+  @Parameter(names = {"--clean-retain-commits"},
+      description = "Number of commits to retain. So data will be retained for 
num_of_commits * time_between_commits (scheduled).\n"
+          + "This also directly translates into how much you can incrementally 
pull on this table, default 10",
+      required = false)
+  public Integer cleanRetainCommits = 10;
+
+  @Parameter(names = {"--archive-min-commits"},
+      description = "Min number of commits to keep before archiving older 
commits into a sequential log, default 20.",
+      required = false)
+  public Integer archiveMinCommits = 20;
+
+  @Parameter(names = {"--archive-max-commits"},
+      description = "Max number of commits to keep before archiving older 
commits into a sequential log, default 30.",
+      required = false)
+  public Integer archiveMaxCommits = 30;
+
+  @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. 
Schedule the clustering plan in this job.\n"
+      + "There is a risk of losing data when scheduling clustering outside the 
writer job.\n"
+      + "Scheduling clustering in the writer job and only let this job do the 
clustering execution is recommended.\n"
+      + "Default is true", required = false)
+  public Boolean schedule = true;
+
+  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to 
cleanup the old commits immediately on new commits, enabled by default", 
required = false)
+  public Boolean cleanAsyncEnable = false;
+
+  @Parameter(names = {"--plan-strategy-class"}, description = "Config to 
provide a strategy class to generator clustering plan", required = false)
+  public String planStrategyClass = 
"org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
+
+  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group 
can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output 
file groups, default 1 GB", required = false)
+  public Integer targetFileMaxBytes = 1024 * 1024 * 1024;
+
+  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than 
the size specified here are candidates for clustering, default 600 MB", 
required = false)
+  public Integer smallFileLimit = 600;
+
+  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number 
of partitions to skip from latest when choosing partitions to create 
ClusteringPlan, default 0", required = false)
+  public Integer skipFromLatestPartitions = 0;
+
+  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the 
data by when clustering.", required = false)
+  public String sortColumns = "";
+
+  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of 
groups to create as part of ClusteringPlan. Increasing groups will increase 
parallelism. default 30", required = false)
+  public Integer maxNumGroups = 30;
+
+  @Parameter(names = {"--target-partitions"}, description = "Number of 
partitions to list to create ClusteringPlan, default 2", required = false)
+  public Integer targetPartitions = 2;
+
+  public static final String SEQ_FIFO = "FIFO";
+  public static final String SEQ_LIFO = "LIFO";
+  @Parameter(names = {"--seq"}, description = "Clustering plan execution 
sequence, two options are supported:\n"
+      + "1). FIFO: execute the oldest plan first;\n"
+      + "2). LIFO: execute the latest plan first, by default LIFO", required = 
false)
+  public String clusteringSeq = SEQ_LIFO;
+
+  @Parameter(names = {"--write-partition-url-encode"}, description = "Whether 
to encode the partition path url, default false")
+  public Boolean writePartitionUrlEncode = false;
+
+  @Parameter(names = {"--hive-style-partitioning"}, description = "Whether to 
use Hive style partitioning.\n"
+      + "If set true, the names of partition folders follow 
<partition_column_name>=<partition_value> format.\n"
+      + "By default false (the names of partition folders are only partition 
values)")
+  public Boolean hiveStylePartitioning = false;
+
+  /**
+   * Transforms a {@code FlinkClusteringConfig.config} into {@code 
Configuration}.
+   * The latter is more suitable for the table APIs. It reads all the 
properties
+   * in the properties file (set by `--props` option) and cmd line options
+   * (set by `--hoodie-conf` option).
+   */
+  public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
+    Configuration conf = new Configuration();
+
+    conf.setString(FlinkOptions.PATH, config.path);
+    conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 
config.archiveMaxCommits);
+    conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 
config.archiveMinCommits);
+    conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 
config.cleanRetainCommits);
+    conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 
config.compactionMaxMemory);
+    conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 
config.clusteringDeltaCommits);
+    conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
+    conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, 
config.planStrategyClass);
+    
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, 
config.targetFileMaxBytes);
+    conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, 
config.smallFileLimit);
+    
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST,
 config.skipFromLatestPartitions);
+    conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
+    conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, 
config.maxNumGroups);
+    conf.setInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS, 
config.targetPartitions);
+    conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
+
+    // use synchronous clustering always
+    conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, config.schedule);
+
+    // bulk insert conf
+    conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, 
config.writePartitionUrlEncode);
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, 
config.hiveStylePartitioning);
+
+    return conf;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
new file mode 100644
index 0000000000..f7c361533a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering program that can be executed manually.
+ */
+public class HoodieFlinkClusteringJob {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
+
+  public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+
+    // set table type
+    conf.setString(FlinkOptions.TABLE_TYPE, 
metaClient.getTableConfig().getTableType().name());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    // judge whether have operation
+    // to compute the clustering instant time and do cluster.
+    if (cfg.schedule) {
+      String clusteringInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
+      boolean scheduled = 
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+      if (!scheduled) {
+        // do nothing.
+        LOG.info("No clustering plan for this job ");
+        return;
+      }
+    }
+
+    table.getMetaClient().reloadActiveTimeline();
+
+    // fetch the instant based on the configured execution sequence
+    HoodieTimeline timeline = 
table.getActiveTimeline().filterPendingReplaceTimeline()
+        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
+    Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.clusteringSeq) 
? timeline.lastInstant() : timeline.firstInstant();
+    if (!requested.isPresent()) {
+      // do nothing.
+      LOG.info("No clustering plan scheduled, turns on the clustering plan 
schedule with --schedule option");
+      return;
+    }
+
+    HoodieInstant clusteringInstant = requested.get();
+
+    HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
+    if (timeline.containsInstant(inflightInstant)) {
+      LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + 
"]");
+      writeClient.rollbackInflightClustering(inflightInstant, table);
+      table.getMetaClient().reloadActiveTimeline();
+    }
+
+    // generate clustering plan
+    // should support configurable commit metadata
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = 
ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), clusteringInstant);
+
+    if (!clusteringPlanOption.isPresent()) {
+      // do nothing.
+      LOG.info("No clustering plan scheduled, turns on the clustering plan 
schedule with --schedule option");
+      return;
+    }
+
+    HoodieClusteringPlan clusteringPlan = 
clusteringPlanOption.get().getRight();
+
+    if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
+        || (clusteringPlan.getInputGroups().isEmpty())) {
+      // No clustering plan, do nothing and return.
+      LOG.info("No clustering plan for instant " + 
clusteringInstant.getTimestamp());
+      return;
+    }
+
+    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
+    HoodieTimeline pendingClusteringTimeline = 
table.getActiveTimeline().filterPendingReplaceTimeline();
+    if (!pendingClusteringTimeline.containsInstant(instant)) {
+      // this means that the clustering plan was written to auxiliary 
path(.tmp)
+      // but not the meta path(.hoodie), this usually happens when the job 
crush
+      // exceptionally.
+
+      // clean the clustering plan in auxiliary path and cancels the 
clustering.
+
+      LOG.warn("The clustering plan was fetched through the auxiliary 
path(.tmp) but not the meta path(.hoodie).\n"
+          + "Clean the clustering plan in auxiliary path and cancels the 
clustering");
+      CompactionUtil.cleanInstant(table.getMetaClient(), instant);
+      return;
+    }
+
+    // get clusteringParallelism.
+    int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) 
== -1
+        ? clusteringPlan.getInputGroups().size() : 
conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+
+    // Mark instant as clustering inflight
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
+
+    final Schema tableAvroSchema = 
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+    // setup configuration
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+
+    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
+        .name("clustering_source")
+        .uid("uid_clustering_source")
+        .rebalance()
+        .transform("clustering_task",
+            TypeInformation.of(ClusteringCommitEvent.class),
+            new ClusteringOperator(conf, rowType))
+        .setParallelism(clusteringPlan.getInputGroups().size());
+
+    ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+    dataStream
+        .addSink(new ClusteringCommitSink(conf))
+        .name("clustering_commit")
+        .uid("uid_clustering_commit")
+        .setParallelism(1);
+
+    env.execute("flink_hudi_clustering");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index f82712bca2..e9574dd52b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -192,7 +192,7 @@ public class FlinkStreamerConfig extends Configuration {
   public Boolean indexGlobalEnabled = true;
 
   @Parameter(names = {"--index-partition-regex"},
-      description = "Whether to load partitions in state if partition path 
matching, default *")
+      description = "Whether to load partitions in state if partition path 
matching, default *")
   public String indexPartitionRegex = ".*";
 
   @Parameter(names = {"--source-avro-schema-path"}, description = "Source avro 
schema file path, the parsed schema is used for deserialization")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b977dfd7c5..fcffbed54b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -37,6 +38,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
@@ -162,6 +164,17 @@ public class StreamerUtil {
             .withPath(conf.getString(FlinkOptions.PATH))
             .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
             
.withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
+            .withClusteringConfig(
+                HoodieClusteringConfig.newBuilder()
+                    
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
+                    
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
+                    
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
+                    
.withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
+                    
.withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))
+                    
.withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT)
 * 1024 * 1024L)
+                    
.withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST))
+                    
.withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
+                    .build())
             .withCompactionConfig(
                 HoodieCompactionConfig.newBuilder()
                     
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
@@ -505,6 +518,11 @@ public class StreamerUtil {
    * Returns the max compaction memory in bytes with given conf.
    */
   public static long getMaxCompactionMemoryInBytes(Configuration conf) {
-    return conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
+    return (long) conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 
1024;
+  }
+
+  public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, 
boolean includeMetadataFields) throws Exception {
+    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+    return schemaUtil.getTableAvroSchema(includeMetadataFields);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
new file mode 100644
index 0000000000..ac2ee0be37
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestSQL;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * IT cases for {@link HoodieFlinkClusteringJob}.
+ */
+public class ITTestHoodieFlinkClustering {
+
+  private static final Map<String, String> EXPECTED = new HashMap<>();
+
+  static {
+    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, 
id2,par1,id2,Stephen,33,2000,par1]");
+    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, 
id4,par2,id4,Fabian,31,4000,par2]");
+    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, 
id6,par3,id6,Emma,20,6000,par3]");
+    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, 
id8,par4,id8,Han,56,8000,par4]");
+  }
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  public void testHoodieFlinkClustering() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), 
WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.targetPartitions = 4;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+    conf.setString(FlinkOptions.TABLE_TYPE, 
metaClient.getTableConfig().getTableType().name());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
+
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // To compute the clustering instant time and do clustering.
+    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    boolean scheduled = 
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+    assertTrue(scheduled, "The clustering plan should be scheduled");
+
+    // fetch the instant based on the configured execution sequence
+    table.getMetaClient().reloadActiveTimeline();
+    HoodieTimeline timeline = 
table.getActiveTimeline().filterPendingReplaceTimeline()
+        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
+
+    // generate clustering plan
+    // should support configurable commit metadata
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = 
ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), timeline.lastInstant().get());
+
+    HoodieClusteringPlan clusteringPlan = 
clusteringPlanOption.get().getRight();
+
+    // Mark instant as clustering inflight
+    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
+
+    final Schema tableAvroSchema = 
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new 
ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
+        .name("clustering_source")
+        .uid("uid_clustering_source")
+        .rebalance()
+        .transform("clustering_task",
+            TypeInformation.of(ClusteringCommitEvent.class),
+            new ClusteringOperator(conf, rowType))
+        .setParallelism(clusteringPlan.getInputGroups().size());
+
+    ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+    dataStream
+        .addSink(new ClusteringCommitSink(conf))
+        .name("clustering_commit")
+        .uid("uid_clustering_commit")
+        .setParallelism(1);
+
+    env.execute("flink_hudi_clustering");
+    TestData.checkWrittenData(tempFile, EXPECTED, 4);
+  }
+}

Reply via email to