danny0405 commented on a change in pull request #3599:
URL: https://github.com/apache/hudi/pull/3599#discussion_r735249946



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -528,6 +528,66 @@ private FlinkOptions() {
       .defaultValue(20)// default min 20 commits
       .withDescription("Min number of commits to keep before archiving older 
commits into a sequential log, default 20");
 
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+
+  public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = 
ConfigOptions
+      .key("clustering.schedule.enabled")
+      .booleanType()
+      .defaultValue(false) // default false for pipeline
+      .withDescription("Async clustering, default false for pipeline");
+
+  public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+      .key("clustering.tasks")
+      .intType()
+      .defaultValue(10)
+      .withDescription("Parallelism of tasks that do actual clustering, 
default is 10");

Review comment:
       Change the default value same with `compaction.tasks`, which is `4`.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringFunction;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ITTestHoodieFlinkClustering {
+
+  private static final Map<String, String> EXPECTED = new HashMap<>();
+
+  static {
+    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, 
id2,par1,id2,Stephen,33,2000,par1]");
+    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, 
id4,par2,id4,Fabian,31,4000,par2]");
+    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, 
id6,par3,id6,Emma,20,6000,par3]");
+    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, 
id8,par4,id8,Han,56,8000,par4]");
+  }
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  public void testHoodieFlinkClustering() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), 
WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+        + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+        + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+        + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+        + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+        + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+        + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+    tableEnv.executeSql(insertInto).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.targetPartitions = 4;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+    conf.setString(FlinkOptions.TABLE_TYPE, 
metaClient.getTableConfig().getTableType().name());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
+
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // To compute the clustering instant time and do clustering.
+    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    boolean scheduled = 
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+    assertTrue(scheduled, "The clustering plan should be scheduled");
+
+    // fetch the instant based on the configured execution sequence
+    table.getMetaClient().reloadActiveTimeline();
+    HoodieTimeline timeline = 
table.getActiveTimeline().filterPendingReplaceTimeline()
+        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
+

Review comment:
       How could we distinguish that the replace commit does not come from 
`insert overwrite ...`.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Configurations for Hoodie Flink clustering.
+ */
+public class FlinkClusteringConfig extends Configuration {
+
+  @Parameter(names = {"--help", "-h"}, help = true)
+  public Boolean help = false;
+
+  // ------------------------------------------------------------------------
+  //  Hudi Write Options
+  // ------------------------------------------------------------------------
+
+  @Parameter(names = {"--path"}, description = "Base path for the target 
hoodie table.", required = true)
+  public String path;
+
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of 
tasks that do actual clustering, default is -1", required = false)
+  public Integer clusteringTasks = -1;
+
+  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in 
MB for compaction spillable map, default 100MB.", required = false)
+  public Integer compactionMaxMemory = 100;
+
+  @Parameter(names = {"--clean-retain-commits"},
+      description = "Number of commits to retain. So data will be retained for 
num_of_commits * time_between_commits (scheduled).\n"
+          + "This also directly translates into how much you can incrementally 
pull on this table, default 10",
+      required = false)
+  public Integer cleanRetainCommits = 10;
+
+  @Parameter(names = {"--archive-min-commits"},
+      description = "Min number of commits to keep before archiving older 
commits into a sequential log, default 20.",
+      required = false)
+  public Integer archiveMinCommits = 20;
+
+  @Parameter(names = {"--archive-max-commits"},
+      description = "Max number of commits to keep before archiving older 
commits into a sequential log, default 30.",
+      required = false)
+  public Integer archiveMaxCommits = 30;
+
+  @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. 
Schedule the clustering plan in this job.\n"
+      + "There is a risk of losing data when scheduling clustering outside the 
writer job.\n"
+      + "Scheduling clustering in the writer job and only let this job do the 
clustering execution is recommended.\n"
+      + "Default is true", required = false)
+  public Boolean schedule = true;
+
+  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to 
cleanup the old commits immediately on new commits, enabled by default", 
required = false)
+  public Boolean cleanAsyncEnable = false;
+
+  @Parameter(names = {"--plan-strategy-class"}, description = "Config to 
provide a strategy class to generator clustering plan", required = false)
+  public String planStrategyClass = 
"org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
+
+  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group 
can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output 
file groups, default 1 GB", required = false)
+  public Integer targetFileMaxBytes = 1024;
+
+  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than 
the size specified here are candidates for clustering, default 600 MB", 
required = false)
+  public Integer smallFileLimit = 600;
+
+  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number 
of partitions to skip from latest when choosing partitions to create 
ClusteringPlan, default 0", required = false)
+  public Integer skipFromLatestPartitions = 0;
+
+  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the 
data by when clustering, default `uuid`", required = false)
+  public String sortColumns;
+
+  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of 
groups to create as part of ClusteringPlan. Increasing groups will increase 
parallelism. default 30", required = false)
+  public Integer maxNumGroups = 30;
+
+  @Parameter(names = {"--target-partitions"}, description = "Number of 
partitions to list to create ClusteringPlan, default 2", required = false)
+  public Integer targetPartitions = 2;
+
+  public static final String SEQ_FIFO = "FIFO";
+  public static final String SEQ_LIFO = "LIFO";
+  @Parameter(names = {"--seq"}, description = "Clustering plan execution 
sequence, two options are supported:\n"
+      + "1). FIFO: execute the oldest plan first;\n"
+      + "2). LIFO: execute the latest plan first, by default LIFO", required = 
false)
+  public String clusteringSeq = SEQ_LIFO;
+
+  @Parameter(names = {"--write-partition-url-encode"}, description = "Whether 
to encode the partition path url, default false")
+  public Boolean writePartitionUrlEncode = false;
+
+  @Parameter(names = {"--hive-style-partitioning"}, description = "Whether to 
use Hive style partitioning.\n"
+      + "If set true, the names of partition folders follow 
<partition_column_name>=<partition_value> format.\n"
+      + "By default false (the names of partition folders are only partition 
values)")
+  public Boolean hiveStylePartitioning = false;
+
+  /**
+   * Transforms a {@code FlinkClusteringConfig.config} into {@code 
Configuration}.
+   * The latter is more suitable for the table APIs. It reads all the 
properties
+   * in the properties file (set by `--props` option) and cmd line options
+   * (set by `--hoodie-conf` option).
+   */
+  public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
+    Configuration conf = new Configuration();
+
+    conf.setString(FlinkOptions.PATH, config.path);
+    conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, 
config.archiveMaxCommits);
+    conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, 
config.archiveMinCommits);
+    conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, 
config.cleanRetainCommits);
+    conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 
config.compactionMaxMemory);
+    conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
+    conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, 
config.planStrategyClass);
+    
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, 
config.targetFileMaxBytes);
+    conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, 
config.smallFileLimit);
+    
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST,
 config.skipFromLatestPartitions);
+    if (config.sortColumns != null) {
+      conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
+    }

Review comment:
       Where is the `CLUSTERING_SORT_COLUMNS` used for ?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/cluster/FlinkClusteringPlanActionExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("checkstyle:LineLength")
+public class FlinkClusteringPlanActionExecutor<T extends HoodieRecordPayload> 
extends
+    BaseClusteringPlanActionExecutor<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(FlinkClusteringPlanActionExecutor.class);
+
+  public FlinkClusteringPlanActionExecutor(HoodieEngineContext context,

Review comment:
       We can use the `HoodieData` to merge the code with 
`SparkClusteringPlanActionExecutor`, this can be done with separate following 
PR.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Only looks at latest 'daybased.lookback.partitions' partitions.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering 
plan.
+ */
+public class FlinkRecentDaysClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+    extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
+  private static final Logger LOG = 
LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class);

Review comment:
       Should we extend from `FlinkSizeBasedClusteringPlanStrategy` instead ?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -489,4 +501,9 @@ public static String 
getLastCompletedInstant(HoodieTableMetaClient metaClient) {
   public static boolean haveSuccessfulCommits(HoodieTableMetaClient 
metaClient) {
     return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
   }
+
+  public static Schema getTableAvroSchema(HoodieFlinkTable<?> table, 
Configuration conf, boolean includeMetadataFields) throws Exception {
+    TableSchemaResolver schemaUtil = new 
TableSchemaResolver(table.getMetaClient(), 
conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
+    return schemaUtil.getTableAvroSchema(includeMetadataFields);

Review comment:
       Change the first param as `HoodieTableMetaClient`

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -528,6 +528,66 @@ private FlinkOptions() {
       .defaultValue(20)// default min 20 commits
       .withDescription("Min number of commits to keep before archiving older 
commits into a sequential log, default 20");
 
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+
+  public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = 
ConfigOptions
+      .key("clustering.schedule.enabled")
+      .booleanType()
+      .defaultValue(false) // default false for pipeline
+      .withDescription("Async clustering, default false for pipeline");

Review comment:
       Schedule the compaction plan, default false

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringFunction;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ITTestHoodieFlinkClustering {
+
+  private static final Map<String, String> EXPECTED = new HashMap<>();
+
+  static {
+    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, 
id2,par1,id2,Stephen,33,2000,par1]");
+    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, 
id4,par2,id4,Fabian,31,4000,par2]");
+    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, 
id6,par3,id6,Emma,20,6000,par3]");
+    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, 
id8,par4,id8,Han,56,8000,par4]");
+  }
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  public void testHoodieFlinkClustering() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), 
WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+        + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+        + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+        + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+        + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+        + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+        + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+    tableEnv.executeSql(insertInto).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.targetPartitions = 4;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+    conf.setString(FlinkOptions.TABLE_TYPE, 
metaClient.getTableConfig().getTableType().name());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
+
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // To compute the clustering instant time and do clustering.
+    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    boolean scheduled = 
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+    assertTrue(scheduled, "The clustering plan should be scheduled");
+
+    // fetch the instant based on the configured execution sequence
+    table.getMetaClient().reloadActiveTimeline();
+    HoodieTimeline timeline = 
table.getActiveTimeline().filterPendingReplaceTimeline()
+        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
+
+    // generate clustering plan
+    // should support configurable commit metadata
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = 
ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), timeline.lastInstant().get());
+
+    HoodieClusteringPlan clusteringPlan = 
clusteringPlanOption.get().getRight();
+
+    final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table, 
conf, false);
+    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+    env.addSource(new ClusteringPlanSourceFunction(table, 
timeline.lastInstant().get(), clusteringPlan))
+        .name("clustering_source")
+        .uid("uid_clustering_source")
+        .rebalance()
+        .transform("clustering_task",
+            TypeInformation.of(ClusteringCommitEvent.class),
+            new ProcessOperator<>(new ClusteringFunction(conf, rowType)))
+        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+        .addSink(new ClusteringCommitSink(conf))

Review comment:
       Why the parallelism set as `WRITE_TASKS`, and can we set a default 
parallelism as the `HoodieFlinkCompactor` ?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -528,6 +528,66 @@ private FlinkOptions() {
       .defaultValue(20)// default min 20 commits
       .withDescription("Min number of commits to keep before archiving older 
commits into a sequential log, default 20");
 
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+
+  public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = 
ConfigOptions
+      .key("clustering.schedule.enabled")
+      .booleanType()
+      .defaultValue(false) // default false for pipeline
+      .withDescription("Async clustering, default false for pipeline");
+
+  public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+      .key("clustering.tasks")
+      .intType()
+      .defaultValue(10)
+      .withDescription("Parallelism of tasks that do actual clustering, 
default is 10");
+
+  public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = 
ConfigOptions
+      .key("clustering.plan.strategy.daybased.lookback.partitions")
+      .intType()
+      .defaultValue(2)
+      .withDescription("Number of partitions to list to create 
ClusteringPlan");
+
+  public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = 
ConfigOptions
+      .key("clustering.plan.strategy.class")
+      .stringType()
+      
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy")
+      .withDescription("Config to provide a strategy class (subclass of 
ClusteringPlanStrategy) to create clustering plan "
+          + "i.e select what file groups are being clustered. Default 
strategy, looks at the last N (determined by "
+          + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks 
the small file slices within those partitions.");
+
+  public static final ConfigOption<Integer> 
CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
+      .key("clustering.plan.strategy.target.file.max.bytes")
+      .intType()
+      .defaultValue(1024 * 1024 * 1024) // default 1 GB
+      .withDescription("Each group can produce 'N' 
(CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, 
default 1 GB");
+
+  public static final ConfigOption<Integer> 
CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
+      .key("clustering.plan.strategy.small.file.limit")
+      .intType()
+      .defaultValue(600) // default 600 MB
+      .withDescription("Files smaller than the size specified here are 
candidates for clustering, default 600 MB");
+
+  public static final ConfigOption<Integer> 
CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
+      .key("clustering.plan.strategy.daybased.skipfromlatest.partitions")
+      .intType()
+      .defaultValue(0)
+      .withDescription("Number of partitions to skip from latest when choosing 
partitions to create ClusteringPlan");
+
+  public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = 
ConfigOptions
+      .key("clustering.plan.strategy.sort.columns")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Columns to sort the data by when clustering");
+
+  public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = 
ConfigOptions
+      .key("clustering.plan.strategy.max.num.groups")
+      .intType()
+      .defaultValue(30)
+      .withDescription("Maximum number of groups to create as part of 
ClusteringPlan. Increasing groups will increase parallelism");
+

Review comment:
       , default 30

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -409,6 +413,40 @@ private void writeTableMetadata(HoodieTable<T, 
List<HoodieRecord<T>>, List<Hoodi
     }
   }
 
+  private void completeClustering(
+      HoodieReplaceCommitMetadata metadata, List<WriteStatus> writeStatuses,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
+      String clusteringCommitTime) {
+    List<HoodieWriteStat> writeStats = 
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+        e.getValue().stream()).collect(Collectors.toList());
+    if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) 
{
+      throw new HoodieClusteringException("Clustering failed to write to 
files:"
+          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 
0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
+    }

Review comment:
       `writeTableMetadata` is missing.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieFlinkTable;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering source function.
+ *
+ * <P>This function read the clustering plan as {@link ClusteringOperation}s 
then assign the clustering task
+ * event {@link ClusteringPlanEvent} to downstream operators.
+ *
+ * <p>The clustering instant time is specified explicitly with strategies:
+ *
+ * <ul>
+ *   <li>If the timeline has no inflight instants,
+ *   use {@link 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
+ *   as the instant time;</li>
+ *   <li>If the timeline has inflight instants,
+ *   use the median instant time between [last complete instant time, earliest 
inflight instant time]
+ *   as the instant time.</li>
+ * </ul>
+ */
+public class ClusteringPlanSourceFunction extends AbstractRichFunction 
implements SourceFunction<ClusteringPlanEvent> {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(ClusteringPlanSourceFunction.class);
+
+  /**
+   * Hoodie flink table.
+   */
+  private final HoodieFlinkTable<?> table;
+
+  /**
+   * The clustering plan.
+   */
+  private final HoodieClusteringPlan clusteringPlan;
+
+  /**
+   * Hoodie instant.
+   */
+  private final HoodieInstant instant;
+
+  public ClusteringPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant 
instant, HoodieClusteringPlan clusteringPlan) {
+    this.table = table;
+    this.instant = instant;
+    this.clusteringPlan = clusteringPlan;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws 
Exception {
+    // Mark instant as clustering inflight
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
+    table.getMetaClient().reloadActiveTimeline();
+

Review comment:
       Move the `transitionReplaceRequestedToInflight` out of the function into 
the `HoodieFlinkClusteringJob#main` and remove the member variable `table`.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering program that can be executed manually.
+ */
+public class HoodieFlinkClusteringJob {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
+
+  public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+
+    // set table type
+    conf.setString(FlinkOptions.TABLE_TYPE, 
metaClient.getTableConfig().getTableType().name());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();

Review comment:
       Should use `StreamerUtil.createWriteClient(conf)` instead to start the 
embedded timeline server.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.client.WriteStatus;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Represents a commit event from the clustering task {@link 
ClusteringFunction}.
+ */
+public class ClusteringCommitEvent implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The clustering commit instant time.
+   */
+  private String instant;
+  /**
+   * The write statuses.
+   */
+  private List<WriteStatus> writeStatuses;
+  /**
+   * The clustering task identifier.
+   */
+  private int taskID;
+

Review comment:
       The event should include a `fileId` to deduplicate for tasks 
failover/retry. Take `CompactionCommitEvent` as a reference. Because there are 
multiple input file ids for a `HoodieClusteringGroup` thus the 
`CompactionCommitEvent`, we can use the first file group id to distinguish.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringFunction.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Function to execute the actual clustering task assigned by the clustering 
plan task.
+ * In order to execute scalable, the input should shuffle by the clustering 
event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringFunction extends ProcessFunction<ClusteringPlanEvent, 
ClusteringCommitEvent> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClusteringFunction.class);
+
+  /**
+   * Config options.
+   */
+  private final Configuration conf;
+
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  private transient HoodieWriteConfig writeConfig;
+
+  private transient HoodieFlinkTable<?> table;
+
+  private transient Schema schema;
+
+  private transient Schema readerSchema;
+
+  private transient int[] requiredPos;
+
+  private transient AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+
+  private transient BulkInsertWriterHelper writerHelper;
+
+  public ClusteringFunction(Configuration conf, RowType rowType) {
+    this.conf = conf;
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+    this.table = writeClient.getHoodieTable();
+
+    this.schema = new Schema.Parser().parse(writeConfig.getSchema());
+    this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+    this.requiredPos = getRequiredPositions();
+
+    this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(rowType);
+  }
+
+  @Override
+  public void processElement(ClusteringPlanEvent event, Context context, 
Collector<ClusteringCommitEvent> collector) throws Exception {
+    final String instantTime = event.getClusteringInstantTime();
+    final HoodieClusteringGroup clusteringGroup = event.getClusteringGroup();
+
+    initWriterHelper(instantTime);
+
+    // executes the clustering task synchronously for batch mode.
+    LOG.info("Execute clustering for instant {} from task {}", instantTime, 
taskID);
+    doClustering(instantTime, clusteringGroup, collector);
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void initWriterHelper(String clusteringInstantTime) {
+    if (this.writerHelper == null) {
+      this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, 
this.writeConfig,
+          clusteringInstantTime, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
+          this.rowType);
+    }
+  }
+
+  private void doClustering(String instantTime, HoodieClusteringGroup 
clusteringGroup, Collector<ClusteringCommitEvent> collector) throws IOException 
{
+    List<ClusteringOperation> clusteringOps = 
clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+

Review comment:
       The `HoodieClusteringGroup` has num of output file groups, the current 
code has only one file group (or more if the parquet size hits the threshold), 
can we find a way to set up the parallelism of bulk_insert writer as that ?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringFunction.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Function to execute the actual clustering task assigned by the clustering 
plan task.
+ * In order to execute scalable, the input should shuffle by the clustering 
event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringFunction extends ProcessFunction<ClusteringPlanEvent, 
ClusteringCommitEvent> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClusteringFunction.class);
+
+  /**
+   * Config options.
+   */
+  private final Configuration conf;
+
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  private transient HoodieWriteConfig writeConfig;
+
+  private transient HoodieFlinkTable<?> table;
+
+  private transient Schema schema;
+
+  private transient Schema readerSchema;
+
+  private transient int[] requiredPos;
+
+  private transient AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+
+  private transient BulkInsertWriterHelper writerHelper;
+
+  public ClusteringFunction(Configuration conf, RowType rowType) {
+    this.conf = conf;
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+    this.table = writeClient.getHoodieTable();
+
+    this.schema = new Schema.Parser().parse(writeConfig.getSchema());
+    this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+    this.requiredPos = getRequiredPositions();
+

Review comment:
       Where is the `requiredPos` used for ?

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringFunction;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ITTestHoodieFlinkClustering {
+
+  private static final Map<String, String> EXPECTED = new HashMap<>();
+
+  static {
+    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, 
id2,par1,id2,Stephen,33,2000,par1]");
+    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, 
id4,par2,id4,Fabian,31,4000,par2]");
+    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, 
id6,par3,id6,Emma,20,6000,par3]");
+    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, 
id8,par4,id8,Han,56,8000,par4]");
+  }
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  public void testHoodieFlinkClustering() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), 
WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"

Review comment:
       Use `TestSql.INSERT_T1` instead.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -528,6 +528,66 @@ private FlinkOptions() {
       .defaultValue(20)// default min 20 commits
       .withDescription("Min number of commits to keep before archiving older 
commits into a sequential log, default 20");
 
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+
+  public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = 
ConfigOptions
+      .key("clustering.schedule.enabled")
+      .booleanType()
+      .defaultValue(false) // default false for pipeline
+      .withDescription("Async clustering, default false for pipeline");
+
+  public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+      .key("clustering.tasks")
+      .intType()
+      .defaultValue(10)
+      .withDescription("Parallelism of tasks that do actual clustering, 
default is 10");
+
+  public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = 
ConfigOptions
+      .key("clustering.plan.strategy.daybased.lookback.partitions")
+      .intType()
+      .defaultValue(2)
+      .withDescription("Number of partitions to list to create 
ClusteringPlan");
+

Review comment:
       , default 2

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringFunction;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ITTestHoodieFlinkClustering {

Review comment:
       /**
    * IT cases for {@link HoodieFlinkClusteringJob}.
    */

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringFunction.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Function to execute the actual clustering task assigned by the clustering 
plan task.
+ * In order to execute scalable, the input should shuffle by the clustering 
event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringFunction extends ProcessFunction<ClusteringPlanEvent, 
ClusteringCommitEvent> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClusteringFunction.class);
+
+  /**
+   * Config options.
+   */
+  private final Configuration conf;
+
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  private transient HoodieWriteConfig writeConfig;
+
+  private transient HoodieFlinkTable<?> table;
+
+  private transient Schema schema;
+
+  private transient Schema readerSchema;
+
+  private transient int[] requiredPos;
+
+  private transient AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+
+  private transient BulkInsertWriterHelper writerHelper;
+
+  public ClusteringFunction(Configuration conf, RowType rowType) {
+    this.conf = conf;
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+    this.table = writeClient.getHoodieTable();
+
+    this.schema = new Schema.Parser().parse(writeConfig.getSchema());
+    this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+    this.requiredPos = getRequiredPositions();
+
+    this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(rowType);
+  }
+
+  @Override
+  public void processElement(ClusteringPlanEvent event, Context context, 
Collector<ClusteringCommitEvent> collector) throws Exception {
+    final String instantTime = event.getClusteringInstantTime();
+    final HoodieClusteringGroup clusteringGroup = event.getClusteringGroup();
+
+    initWriterHelper(instantTime);
+
+    // executes the clustering task synchronously for batch mode.
+    LOG.info("Execute clustering for instant {} from task {}", instantTime, 
taskID);
+    doClustering(instantTime, clusteringGroup, collector);
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void initWriterHelper(String clusteringInstantTime) {
+    if (this.writerHelper == null) {
+      this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, 
this.writeConfig,
+          clusteringInstantTime, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
+          this.rowType);
+    }
+  }
+
+  private void doClustering(String instantTime, HoodieClusteringGroup 
clusteringGroup, Collector<ClusteringCommitEvent> collector) throws IOException 
{
+    List<ClusteringOperation> clusteringOps = 
clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+
+    Iterator<RowData> iterator;
+    if (hasLogFiles) {
+      // if there are log files, we read all records into memory for a file 
group and apply updates.
+      iterator = readRecordsForGroupWithLogs(clusteringOps, instantTime);
+    } else {
+      // We want to optimize reading records for case there are no log files.
+      iterator = readRecordsForGroupBaseFiles(clusteringOps);
+    }
+
+    while (iterator.hasNext()) {
+      this.writerHelper.write(iterator.next());
+    }
+    List<WriteStatus> writeStatuses = 
this.writerHelper.getWriteStatuses(this.taskID);
+    collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, 
this.taskID));
+  }
+
+  /**
+   * Read records from baseFiles, apply updates and convert to Iterator.
+   */
+  @SuppressWarnings("unchecked")
+  private Iterator<RowData> 
readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String 
instantTime) {
+    List<Iterator<RowData>> recordIterators = new ArrayList<>();
+
+    long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new 
FlinkTaskContextSupplier(null), writeConfig);
+    LOG.info("MaxMemoryPerCompaction run as part of clustering => " + 
maxMemoryPerCompaction);
+
+    for (ClusteringOperation clusteringOp : clusteringOps) {
+      try {
+        Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getSchema()));
+        HoodieFileReader<? extends IndexedRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new 
Path(clusteringOp.getDataFilePath()));

Review comment:
       Use the `readerSchema` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to