danny0405 commented on a change in pull request #3599:
URL: https://github.com/apache/hudi/pull/3599#discussion_r735249946
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -528,6 +528,66 @@ private FlinkOptions() {
.defaultValue(20)// default min 20 commits
.withDescription("Min number of commits to keep before archiving older
commits into a sequential log, default 20");
+ // ------------------------------------------------------------------------
+ // Clustering Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED =
ConfigOptions
+ .key("clustering.schedule.enabled")
+ .booleanType()
+ .defaultValue(false) // default false for pipeline
+ .withDescription("Async clustering, default false for pipeline");
+
+ public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+ .key("clustering.tasks")
+ .intType()
+ .defaultValue(10)
+ .withDescription("Parallelism of tasks that do actual clustering,
default is 10");
Review comment:
Change the default value same with `compaction.tasks`, which is `4`.
##########
File path:
hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringFunction;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ITTestHoodieFlinkClustering {
+
+ private static final Map<String, String> EXPECTED = new HashMap<>();
+
+ static {
+ EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1,
id2,par1,id2,Stephen,33,2000,par1]");
+ EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2,
id4,par2,id4,Fabian,31,4000,par2]");
+ EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3,
id6,par3,id6,Emma,20,6000,par3]");
+ EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4,
id8,par4,id8,Han,56,8000,par4]");
+ }
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ public void testHoodieFlinkClustering() throws Exception {
+ // Create hoodie table and insert into data.
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+ // use append mode
+ options.put(FlinkOptions.OPERATION.key(),
WriteOperationType.INSERT.value());
+ options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ tableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+ + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+ tableEnv.executeSql(insertInto).await();
+
+ // wait for the asynchronous commit to finish
+ TimeUnit.SECONDS.sleep(3);
+
+ // Make configuration and setAvroSchema.
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ cfg.targetPartitions = 4;
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set the table name
+ conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+ conf.setString(FlinkOptions.TABLE_TYPE,
metaClient.getTableConfig().getTableType().name());
+
+ // set record key field
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+ // set partition field
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
+
+ long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ // judge whether have operation
+ // To compute the clustering instant time and do clustering.
+ String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf,
null);
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ boolean scheduled =
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+ assertTrue(scheduled, "The clustering plan should be scheduled");
+
+ // fetch the instant based on the configured execution sequence
+ table.getMetaClient().reloadActiveTimeline();
+ HoodieTimeline timeline =
table.getActiveTimeline().filterPendingReplaceTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
+
Review comment:
How could we distinguish that the replace commit does not come from
`insert overwrite ...`.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Configurations for Hoodie Flink clustering.
+ */
+public class FlinkClusteringConfig extends Configuration {
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ // ------------------------------------------------------------------------
+ // Hudi Write Options
+ // ------------------------------------------------------------------------
+
+ @Parameter(names = {"--path"}, description = "Base path for the target
hoodie table.", required = true)
+ public String path;
+
+ // ------------------------------------------------------------------------
+ // Clustering Options
+ // ------------------------------------------------------------------------
+ @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of
tasks that do actual clustering, default is -1", required = false)
+ public Integer clusteringTasks = -1;
+
+ @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in
MB for compaction spillable map, default 100MB.", required = false)
+ public Integer compactionMaxMemory = 100;
+
+ @Parameter(names = {"--clean-retain-commits"},
+ description = "Number of commits to retain. So data will be retained for
num_of_commits * time_between_commits (scheduled).\n"
+ + "This also directly translates into how much you can incrementally
pull on this table, default 10",
+ required = false)
+ public Integer cleanRetainCommits = 10;
+
+ @Parameter(names = {"--archive-min-commits"},
+ description = "Min number of commits to keep before archiving older
commits into a sequential log, default 20.",
+ required = false)
+ public Integer archiveMinCommits = 20;
+
+ @Parameter(names = {"--archive-max-commits"},
+ description = "Max number of commits to keep before archiving older
commits into a sequential log, default 30.",
+ required = false)
+ public Integer archiveMaxCommits = 30;
+
+ @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended.
Schedule the clustering plan in this job.\n"
+ + "There is a risk of losing data when scheduling clustering outside the
writer job.\n"
+ + "Scheduling clustering in the writer job and only let this job do the
clustering execution is recommended.\n"
+ + "Default is true", required = false)
+ public Boolean schedule = true;
+
+ @Parameter(names = {"--clean-async-enabled"}, description = "Whether to
cleanup the old commits immediately on new commits, enabled by default",
required = false)
+ public Boolean cleanAsyncEnable = false;
+
+ @Parameter(names = {"--plan-strategy-class"}, description = "Config to
provide a strategy class to generator clustering plan", required = false)
+ public String planStrategyClass =
"org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
+
+ @Parameter(names = {"--target-file-max-bytes"}, description = "Each group
can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output
file groups, default 1 GB", required = false)
+ public Integer targetFileMaxBytes = 1024;
+
+ @Parameter(names = {"--small-file-limit"}, description = "Files smaller than
the size specified here are candidates for clustering, default 600 MB",
required = false)
+ public Integer smallFileLimit = 600;
+
+ @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number
of partitions to skip from latest when choosing partitions to create
ClusteringPlan, default 0", required = false)
+ public Integer skipFromLatestPartitions = 0;
+
+ @Parameter(names = {"--sort-columns"}, description = "Columns to sort the
data by when clustering, default `uuid`", required = false)
+ public String sortColumns;
+
+ @Parameter(names = {"--max-num-groups"}, description = "Maximum number of
groups to create as part of ClusteringPlan. Increasing groups will increase
parallelism. default 30", required = false)
+ public Integer maxNumGroups = 30;
+
+ @Parameter(names = {"--target-partitions"}, description = "Number of
partitions to list to create ClusteringPlan, default 2", required = false)
+ public Integer targetPartitions = 2;
+
+ public static final String SEQ_FIFO = "FIFO";
+ public static final String SEQ_LIFO = "LIFO";
+ @Parameter(names = {"--seq"}, description = "Clustering plan execution
sequence, two options are supported:\n"
+ + "1). FIFO: execute the oldest plan first;\n"
+ + "2). LIFO: execute the latest plan first, by default LIFO", required =
false)
+ public String clusteringSeq = SEQ_LIFO;
+
+ @Parameter(names = {"--write-partition-url-encode"}, description = "Whether
to encode the partition path url, default false")
+ public Boolean writePartitionUrlEncode = false;
+
+ @Parameter(names = {"--hive-style-partitioning"}, description = "Whether to
use Hive style partitioning.\n"
+ + "If set true, the names of partition folders follow
<partition_column_name>=<partition_value> format.\n"
+ + "By default false (the names of partition folders are only partition
values)")
+ public Boolean hiveStylePartitioning = false;
+
+ /**
+ * Transforms a {@code FlinkClusteringConfig.config} into {@code
Configuration}.
+ * The latter is more suitable for the table APIs. It reads all the
properties
+ * in the properties file (set by `--props` option) and cmd line options
+ * (set by `--hoodie-conf` option).
+ */
+ public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
+ Configuration conf = new Configuration();
+
+ conf.setString(FlinkOptions.PATH, config.path);
+ conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS,
config.archiveMaxCommits);
+ conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS,
config.archiveMinCommits);
+ conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS,
config.cleanRetainCommits);
+ conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY,
config.compactionMaxMemory);
+ conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
+ conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
config.planStrategyClass);
+
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES,
config.targetFileMaxBytes);
+ conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT,
config.smallFileLimit);
+
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST,
config.skipFromLatestPartitions);
+ if (config.sortColumns != null) {
+ conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
+ }
Review comment:
Where is the `CLUSTERING_SORT_COLUMNS` used for ?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/cluster/FlinkClusteringPlanActionExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("checkstyle:LineLength")
+public class FlinkClusteringPlanActionExecutor<T extends HoodieRecordPayload>
extends
+ BaseClusteringPlanActionExecutor<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
+
+ private static final Logger LOG =
LogManager.getLogger(FlinkClusteringPlanActionExecutor.class);
+
+ public FlinkClusteringPlanActionExecutor(HoodieEngineContext context,
Review comment:
We can use the `HoodieData` to merge the code with
`SparkClusteringPlanActionExecutor`, this can be done with separate following
PR.
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Only looks at latest 'daybased.lookback.partitions' partitions.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering
plan.
+ */
+public class FlinkRecentDaysClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
+ extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
+ private static final Logger LOG =
LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class);
Review comment:
Should we extend from `FlinkSizeBasedClusteringPlanStrategy` instead ?
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -489,4 +501,9 @@ public static String
getLastCompletedInstant(HoodieTableMetaClient metaClient) {
public static boolean haveSuccessfulCommits(HoodieTableMetaClient
metaClient) {
return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
}
+
+ public static Schema getTableAvroSchema(HoodieFlinkTable<?> table,
Configuration conf, boolean includeMetadataFields) throws Exception {
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(table.getMetaClient(),
conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
+ return schemaUtil.getTableAvroSchema(includeMetadataFields);
Review comment:
Change the first param as `HoodieTableMetaClient`
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -528,6 +528,66 @@ private FlinkOptions() {
.defaultValue(20)// default min 20 commits
.withDescription("Min number of commits to keep before archiving older
commits into a sequential log, default 20");
+ // ------------------------------------------------------------------------
+ // Clustering Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED =
ConfigOptions
+ .key("clustering.schedule.enabled")
+ .booleanType()
+ .defaultValue(false) // default false for pipeline
+ .withDescription("Async clustering, default false for pipeline");
Review comment:
Schedule the compaction plan, default false
##########
File path:
hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringFunction;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ITTestHoodieFlinkClustering {
+
+ private static final Map<String, String> EXPECTED = new HashMap<>();
+
+ static {
+ EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1,
id2,par1,id2,Stephen,33,2000,par1]");
+ EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2,
id4,par2,id4,Fabian,31,4000,par2]");
+ EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3,
id6,par3,id6,Emma,20,6000,par3]");
+ EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4,
id8,par4,id8,Han,56,8000,par4]");
+ }
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ public void testHoodieFlinkClustering() throws Exception {
+ // Create hoodie table and insert into data.
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+ // use append mode
+ options.put(FlinkOptions.OPERATION.key(),
WriteOperationType.INSERT.value());
+ options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ tableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+ + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+ tableEnv.executeSql(insertInto).await();
+
+ // wait for the asynchronous commit to finish
+ TimeUnit.SECONDS.sleep(3);
+
+ // Make configuration and setAvroSchema.
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ cfg.targetPartitions = 4;
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set the table name
+ conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+ conf.setString(FlinkOptions.TABLE_TYPE,
metaClient.getTableConfig().getTableType().name());
+
+ // set record key field
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+ // set partition field
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
+
+ long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ // judge whether have operation
+ // To compute the clustering instant time and do clustering.
+ String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf,
null);
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ boolean scheduled =
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+ assertTrue(scheduled, "The clustering plan should be scheduled");
+
+ // fetch the instant based on the configured execution sequence
+ table.getMetaClient().reloadActiveTimeline();
+ HoodieTimeline timeline =
table.getActiveTimeline().filterPendingReplaceTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
+
+ // generate clustering plan
+ // should support configurable commit metadata
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption =
ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(), timeline.lastInstant().get());
+
+ HoodieClusteringPlan clusteringPlan =
clusteringPlanOption.get().getRight();
+
+ final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table,
conf, false);
+ final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+ env.addSource(new ClusteringPlanSourceFunction(table,
timeline.lastInstant().get(), clusteringPlan))
+ .name("clustering_source")
+ .uid("uid_clustering_source")
+ .rebalance()
+ .transform("clustering_task",
+ TypeInformation.of(ClusteringCommitEvent.class),
+ new ProcessOperator<>(new ClusteringFunction(conf, rowType)))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+ .addSink(new ClusteringCommitSink(conf))
Review comment:
Why the parallelism set as `WRITE_TASKS`, and can we set a default
parallelism as the `HoodieFlinkCompactor` ?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -528,6 +528,66 @@ private FlinkOptions() {
.defaultValue(20)// default min 20 commits
.withDescription("Min number of commits to keep before archiving older
commits into a sequential log, default 20");
+ // ------------------------------------------------------------------------
+ // Clustering Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED =
ConfigOptions
+ .key("clustering.schedule.enabled")
+ .booleanType()
+ .defaultValue(false) // default false for pipeline
+ .withDescription("Async clustering, default false for pipeline");
+
+ public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+ .key("clustering.tasks")
+ .intType()
+ .defaultValue(10)
+ .withDescription("Parallelism of tasks that do actual clustering,
default is 10");
+
+ public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS =
ConfigOptions
+ .key("clustering.plan.strategy.daybased.lookback.partitions")
+ .intType()
+ .defaultValue(2)
+ .withDescription("Number of partitions to list to create
ClusteringPlan");
+
+ public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS =
ConfigOptions
+ .key("clustering.plan.strategy.class")
+ .stringType()
+
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy")
+ .withDescription("Config to provide a strategy class (subclass of
ClusteringPlanStrategy) to create clustering plan "
+ + "i.e select what file groups are being clustered. Default
strategy, looks at the last N (determined by "
+ + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks
the small file slices within those partitions.");
+
+ public static final ConfigOption<Integer>
CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
+ .key("clustering.plan.strategy.target.file.max.bytes")
+ .intType()
+ .defaultValue(1024 * 1024 * 1024) // default 1 GB
+ .withDescription("Each group can produce 'N'
(CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups,
default 1 GB");
+
+ public static final ConfigOption<Integer>
CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
+ .key("clustering.plan.strategy.small.file.limit")
+ .intType()
+ .defaultValue(600) // default 600 MB
+ .withDescription("Files smaller than the size specified here are
candidates for clustering, default 600 MB");
+
+ public static final ConfigOption<Integer>
CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
+ .key("clustering.plan.strategy.daybased.skipfromlatest.partitions")
+ .intType()
+ .defaultValue(0)
+ .withDescription("Number of partitions to skip from latest when choosing
partitions to create ClusteringPlan");
+
+ public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS =
ConfigOptions
+ .key("clustering.plan.strategy.sort.columns")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Columns to sort the data by when clustering");
+
+ public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS =
ConfigOptions
+ .key("clustering.plan.strategy.max.num.groups")
+ .intType()
+ .defaultValue(30)
+ .withDescription("Maximum number of groups to create as part of
ClusteringPlan. Increasing groups will increase parallelism");
+
Review comment:
, default 30
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -409,6 +413,40 @@ private void writeTableMetadata(HoodieTable<T,
List<HoodieRecord<T>>, List<Hoodi
}
}
+ private void completeClustering(
+ HoodieReplaceCommitMetadata metadata, List<WriteStatus> writeStatuses,
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
+ String clusteringCommitTime) {
+ List<HoodieWriteStat> writeStats =
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+ e.getValue().stream()).collect(Collectors.toList());
+ if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0)
{
+ throw new HoodieClusteringException("Clustering failed to write to
files:"
+ + writeStats.stream().filter(s -> s.getTotalWriteErrors() >
0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
+ }
Review comment:
`writeTableMetadata` is missing.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieFlinkTable;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering source function.
+ *
+ * <P>This function read the clustering plan as {@link ClusteringOperation}s
then assign the clustering task
+ * event {@link ClusteringPlanEvent} to downstream operators.
+ *
+ * <p>The clustering instant time is specified explicitly with strategies:
+ *
+ * <ul>
+ * <li>If the timeline has no inflight instants,
+ * use {@link
org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
+ * as the instant time;</li>
+ * <li>If the timeline has inflight instants,
+ * use the median instant time between [last complete instant time, earliest
inflight instant time]
+ * as the instant time.</li>
+ * </ul>
+ */
+public class ClusteringPlanSourceFunction extends AbstractRichFunction
implements SourceFunction<ClusteringPlanEvent> {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(ClusteringPlanSourceFunction.class);
+
+ /**
+ * Hoodie flink table.
+ */
+ private final HoodieFlinkTable<?> table;
+
+ /**
+ * The clustering plan.
+ */
+ private final HoodieClusteringPlan clusteringPlan;
+
+ /**
+ * Hoodie instant.
+ */
+ private final HoodieInstant instant;
+
+ public ClusteringPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant
instant, HoodieClusteringPlan clusteringPlan) {
+ this.table = table;
+ this.instant = instant;
+ this.clusteringPlan = clusteringPlan;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // no operation
+ }
+
+ @Override
+ public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws
Exception {
+ // Mark instant as clustering inflight
+ table.getActiveTimeline().transitionReplaceRequestedToInflight(instant,
Option.empty());
+ table.getMetaClient().reloadActiveTimeline();
+
Review comment:
Move the `transitionReplaceRequestedToInflight` out of the function into
the `HoodieFlinkClusteringJob#main` and remove the member variable `table`.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering program that can be executed manually.
+ */
+public class HoodieFlinkClusteringJob {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set table name
+ conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+
+ // set table type
+ conf.setString(FlinkOptions.TABLE_TYPE,
metaClient.getTableConfig().getTableType().name());
+
+ // set record key field
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+
+ // set partition field
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf,
null);
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
Review comment:
Should use `StreamerUtil.createWriteClient(conf)` instead to start the
embedded timeline server.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.client.WriteStatus;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Represents a commit event from the clustering task {@link
ClusteringFunction}.
+ */
+public class ClusteringCommitEvent implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The clustering commit instant time.
+ */
+ private String instant;
+ /**
+ * The write statuses.
+ */
+ private List<WriteStatus> writeStatuses;
+ /**
+ * The clustering task identifier.
+ */
+ private int taskID;
+
Review comment:
The event should include a `fileId` to deduplicate for tasks
failover/retry. Take `CompactionCommitEvent` as a reference. Because there are
multiple input file ids for a `HoodieClusteringGroup` thus the
`CompactionCommitEvent`, we can use the first file group id to distinguish.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringFunction.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Function to execute the actual clustering task assigned by the clustering
plan task.
+ * In order to execute scalable, the input should shuffle by the clustering
event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringFunction extends ProcessFunction<ClusteringPlanEvent,
ClusteringCommitEvent> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ClusteringFunction.class);
+
+ /**
+ * Config options.
+ */
+ private final Configuration conf;
+
+ private final RowType rowType;
+
+ /**
+ * Id of current subtask.
+ */
+ private int taskID;
+
+ private transient HoodieWriteConfig writeConfig;
+
+ private transient HoodieFlinkTable<?> table;
+
+ private transient Schema schema;
+
+ private transient Schema readerSchema;
+
+ private transient int[] requiredPos;
+
+ private transient AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
+
+ private transient BulkInsertWriterHelper writerHelper;
+
+ public ClusteringFunction(Configuration conf, RowType rowType) {
+ this.conf = conf;
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf,
getRuntimeContext());
+ this.table = writeClient.getHoodieTable();
+
+ this.schema = new Schema.Parser().parse(writeConfig.getSchema());
+ this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+ this.requiredPos = getRequiredPositions();
+
+ this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(rowType);
+ }
+
+ @Override
+ public void processElement(ClusteringPlanEvent event, Context context,
Collector<ClusteringCommitEvent> collector) throws Exception {
+ final String instantTime = event.getClusteringInstantTime();
+ final HoodieClusteringGroup clusteringGroup = event.getClusteringGroup();
+
+ initWriterHelper(instantTime);
+
+ // executes the clustering task synchronously for batch mode.
+ LOG.info("Execute clustering for instant {} from task {}", instantTime,
taskID);
+ doClustering(instantTime, clusteringGroup, collector);
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void initWriterHelper(String clusteringInstantTime) {
+ if (this.writerHelper == null) {
+ this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table,
this.writeConfig,
+ clusteringInstantTime, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
+ this.rowType);
+ }
+ }
+
+ private void doClustering(String instantTime, HoodieClusteringGroup
clusteringGroup, Collector<ClusteringCommitEvent> collector) throws IOException
{
+ List<ClusteringOperation> clusteringOps =
clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
+ boolean hasLogFiles = clusteringOps.stream().anyMatch(op ->
op.getDeltaFilePaths().size() > 0);
+
Review comment:
The `HoodieClusteringGroup` has num of output file groups, the current
code has only one file group (or more if the parquet size hits the threshold),
can we find a way to set up the parallelism of bulk_insert writer as that ?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringFunction.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Function to execute the actual clustering task assigned by the clustering
plan task.
+ * In order to execute scalable, the input should shuffle by the clustering
event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringFunction extends ProcessFunction<ClusteringPlanEvent,
ClusteringCommitEvent> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ClusteringFunction.class);
+
+ /**
+ * Config options.
+ */
+ private final Configuration conf;
+
+ private final RowType rowType;
+
+ /**
+ * Id of current subtask.
+ */
+ private int taskID;
+
+ private transient HoodieWriteConfig writeConfig;
+
+ private transient HoodieFlinkTable<?> table;
+
+ private transient Schema schema;
+
+ private transient Schema readerSchema;
+
+ private transient int[] requiredPos;
+
+ private transient AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
+
+ private transient BulkInsertWriterHelper writerHelper;
+
+ public ClusteringFunction(Configuration conf, RowType rowType) {
+ this.conf = conf;
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf,
getRuntimeContext());
+ this.table = writeClient.getHoodieTable();
+
+ this.schema = new Schema.Parser().parse(writeConfig.getSchema());
+ this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+ this.requiredPos = getRequiredPositions();
+
Review comment:
Where is the `requiredPos` used for ?
##########
File path:
hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringFunction;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ITTestHoodieFlinkClustering {
+
+ private static final Map<String, String> EXPECTED = new HashMap<>();
+
+ static {
+ EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1,
id2,par1,id2,Stephen,33,2000,par1]");
+ EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2,
id4,par2,id4,Fabian,31,4000,par2]");
+ EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3,
id6,par3,id6,Emma,20,6000,par3]");
+ EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4,
id8,par4,id8,Han,56,8000,par4]");
+ }
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ public void testHoodieFlinkClustering() throws Exception {
+ // Create hoodie table and insert into data.
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+ // use append mode
+ options.put(FlinkOptions.OPERATION.key(),
WriteOperationType.INSERT.value());
+ options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ tableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
Review comment:
Use `TestSql.INSERT_T1` instead.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -528,6 +528,66 @@ private FlinkOptions() {
.defaultValue(20)// default min 20 commits
.withDescription("Min number of commits to keep before archiving older
commits into a sequential log, default 20");
+ // ------------------------------------------------------------------------
+ // Clustering Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED =
ConfigOptions
+ .key("clustering.schedule.enabled")
+ .booleanType()
+ .defaultValue(false) // default false for pipeline
+ .withDescription("Async clustering, default false for pipeline");
+
+ public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+ .key("clustering.tasks")
+ .intType()
+ .defaultValue(10)
+ .withDescription("Parallelism of tasks that do actual clustering,
default is 10");
+
+ public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS =
ConfigOptions
+ .key("clustering.plan.strategy.daybased.lookback.partitions")
+ .intType()
+ .defaultValue(2)
+ .withDescription("Number of partitions to list to create
ClusteringPlan");
+
Review comment:
, default 2
##########
File path:
hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringFunction;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ITTestHoodieFlinkClustering {
Review comment:
/**
* IT cases for {@link HoodieFlinkClusteringJob}.
*/
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringFunction.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Function to execute the actual clustering task assigned by the clustering
plan task.
+ * In order to execute scalable, the input should shuffle by the clustering
event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringFunction extends ProcessFunction<ClusteringPlanEvent,
ClusteringCommitEvent> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ClusteringFunction.class);
+
+ /**
+ * Config options.
+ */
+ private final Configuration conf;
+
+ private final RowType rowType;
+
+ /**
+ * Id of current subtask.
+ */
+ private int taskID;
+
+ private transient HoodieWriteConfig writeConfig;
+
+ private transient HoodieFlinkTable<?> table;
+
+ private transient Schema schema;
+
+ private transient Schema readerSchema;
+
+ private transient int[] requiredPos;
+
+ private transient AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
+
+ private transient BulkInsertWriterHelper writerHelper;
+
+ public ClusteringFunction(Configuration conf, RowType rowType) {
+ this.conf = conf;
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf,
getRuntimeContext());
+ this.table = writeClient.getHoodieTable();
+
+ this.schema = new Schema.Parser().parse(writeConfig.getSchema());
+ this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+ this.requiredPos = getRequiredPositions();
+
+ this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(rowType);
+ }
+
+ @Override
+ public void processElement(ClusteringPlanEvent event, Context context,
Collector<ClusteringCommitEvent> collector) throws Exception {
+ final String instantTime = event.getClusteringInstantTime();
+ final HoodieClusteringGroup clusteringGroup = event.getClusteringGroup();
+
+ initWriterHelper(instantTime);
+
+ // executes the clustering task synchronously for batch mode.
+ LOG.info("Execute clustering for instant {} from task {}", instantTime,
taskID);
+ doClustering(instantTime, clusteringGroup, collector);
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void initWriterHelper(String clusteringInstantTime) {
+ if (this.writerHelper == null) {
+ this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table,
this.writeConfig,
+ clusteringInstantTime, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
+ this.rowType);
+ }
+ }
+
+ private void doClustering(String instantTime, HoodieClusteringGroup
clusteringGroup, Collector<ClusteringCommitEvent> collector) throws IOException
{
+ List<ClusteringOperation> clusteringOps =
clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
+ boolean hasLogFiles = clusteringOps.stream().anyMatch(op ->
op.getDeltaFilePaths().size() > 0);
+
+ Iterator<RowData> iterator;
+ if (hasLogFiles) {
+ // if there are log files, we read all records into memory for a file
group and apply updates.
+ iterator = readRecordsForGroupWithLogs(clusteringOps, instantTime);
+ } else {
+ // We want to optimize reading records for case there are no log files.
+ iterator = readRecordsForGroupBaseFiles(clusteringOps);
+ }
+
+ while (iterator.hasNext()) {
+ this.writerHelper.write(iterator.next());
+ }
+ List<WriteStatus> writeStatuses =
this.writerHelper.getWriteStatuses(this.taskID);
+ collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses,
this.taskID));
+ }
+
+ /**
+ * Read records from baseFiles, apply updates and convert to Iterator.
+ */
+ @SuppressWarnings("unchecked")
+ private Iterator<RowData>
readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String
instantTime) {
+ List<Iterator<RowData>> recordIterators = new ArrayList<>();
+
+ long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new
FlinkTaskContextSupplier(null), writeConfig);
+ LOG.info("MaxMemoryPerCompaction run as part of clustering => " +
maxMemoryPerCompaction);
+
+ for (ClusteringOperation clusteringOp : clusteringOps) {
+ try {
+ Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(writeConfig.getSchema()));
+ HoodieFileReader<? extends IndexedRecord> baseFileReader =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new
Path(clusteringOp.getDataFilePath()));
Review comment:
Use the `readerSchema` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]