This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1acec1e64551 [HUDI-9593] support custom partitioner in append mode
(#13557)
1acec1e64551 is described below
commit 1acec1e64551f33f3cf24449fe3cd52db39513fa
Author: fhan <[email protected]>
AuthorDate: Thu Jul 17 20:02:36 2025 +0800
[HUDI-9593] support custom partitioner in append mode (#13557)
* [HUDI-9593] support custom partitioner in append mode
* cosmetic changes
* rename GroupedInsertPartitioner & set groupNumber & fix checkstyle
---------
Co-authored-by: fhan <[email protected]>
Co-authored-by: danny0405 <[email protected]>
---
.../apache/hudi/configuration/FlinkOptions.java | 14 ++++
.../apache/hudi/configuration/OptionsResolver.java | 17 +++++
.../sink/partitioner/GroupedInsertPartitioner.java | 88 ++++++++++++++++++++++
.../java/org/apache/hudi/sink/utils/Pipelines.java | 7 ++
.../partitioner/TestGroupedInsertPartitioner.java | 60 +++++++++++++++
5 files changed, 186 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 4b5287280530..8ee7fb2b8f2c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -128,6 +128,20 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Payload class used. Override this, if you like to roll
your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");
+ public static final ConfigOption<String> INSERT_PARTITIONER_CLASS_NAME =
ConfigOptions
+ .key("write.insert.partitioner.class.name")
+ .stringType()
+ .defaultValue("")
+ .withDescription("Insert partitioner to use aiming to re-balance records
and reducing small file number "
+ + "in the scenario of multi-level partitioning. For example
dt/hour/eventID"
+ + "Currently support
org.apache.hudi.sink.partitioner.DefaultInsertPartitioner");
+
+ public static final ConfigOption<Integer> DEFAULT_PARALLELISM_PER_PARTITION
= ConfigOptions
+ .key("write.insert.partitioner.default_parallelism_per_partition")
+ .intType()
+ .defaultValue(30)
+ .withDescription("The parallelism to use in each partition when using
DefaultInsertPartitioner.");
+
@AdvancedConfig
public static final ConfigOption<String> RECORD_MERGER_IMPLS = ConfigOptions
.key("record.merger.impls")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index e6b3a361b6dc..fb8a15edd636 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -31,6 +31,8 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -42,6 +44,7 @@ import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.HoodieFlinkIOFactory;
+import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
@@ -487,6 +490,20 @@ public class OptionsResolver {
return isCowTable(conf) && isUpsertOperation(conf);
}
+ /**
+ * Returns the customized insert partitioner instance.
+ */
+ public static Option<Partitioner> getInsertPartitioner(Configuration conf) {
+ String insertPartitionerClass =
conf.getString(FlinkOptions.INSERT_PARTITIONER_CLASS_NAME);
+ try {
+ return StringUtils.isNullOrEmpty(insertPartitionerClass)
+ ? Option.empty()
+ : Option.of((Partitioner)
ReflectionUtils.loadClass(insertPartitionerClass, conf));
+ } catch (Throwable e) {
+ throw new HoodieException("Could not create custom insert partitioner "
+ insertPartitionerClass, e);
+ }
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GroupedInsertPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GroupedInsertPartitioner.java
new file mode 100644
index 000000000000..b00982009715
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GroupedInsertPartitioner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Random;
+
+/**
+ * Insert input partitioner.
+ *
+ * 1. Split numPartitions into multi bucket based on numPartitions /
groupLength
+ * 2. Route each record into related Bucket based on (partitionPath.hashCode()
& Integer.MAX_VALUE) % groupNumber
+ * 3. Using random to get specific index number.
+ *
+ * Need to take care of the last bucket to avoid resource wast.
+ */
+public class GroupedInsertPartitioner<T extends HoodieKey> implements
Partitioner<T> {
+
+ private final int groupLength;
+ private final Random random;
+ private int groupNumber = -1;
+ private int remaining = -1;
+
+ public GroupedInsertPartitioner(Configuration conf) {
+ this.groupLength =
conf.get(FlinkOptions.DEFAULT_PARALLELISM_PER_PARTITION); // default 30 ==>
parallelism per partition
+ this.random = new Random();
+ }
+
+ /**
+ * Make sure that data with the same partition will only be routed to the
same flink task group(groupIndex).
+ * @param hoodieKey
+ * @param numPartitions
+ * @return
+ */
+ @Override
+ public int partition(HoodieKey hoodieKey, int numPartitions) {
+ setupIfNecessary(numPartitions);
+ String partitionPath = hoodieKey.getPartitionPath();
+ int groupNumber = numPartitions / groupLength;
+ int remaining = numPartitions - groupNumber * groupLength;
+ ValidationUtils.checkArgument(groupNumber != 0,
+ String.format("write.insert.partitioner.parallelism.per.partition are
greater than numPartitions %d.", numPartitions));
+
+ int groupIndex = (partitionPath.hashCode() & Integer.MAX_VALUE) %
groupNumber;
+ int step;
+
+ if (remaining > 0 && groupIndex == groupNumber - 1) {
+ // the last group contains remaining partitions.
+ step = random.nextInt(groupLength + remaining);
+ } else {
+ step = random.nextInt(groupLength);
+ }
+ return groupIndex * groupLength + step;
+ }
+
+ /**
+ * set up groupNumber and remaining for the first time, avoid unnecessary
calculation.
+ * @param numPartitions
+ */
+ private void setupIfNecessary(int numPartitions) {
+ if (groupNumber == -1 || remaining == -1) {
+ groupNumber = Math.max(1, numPartitions / groupLength);
+ remaining = numPartitions - groupNumber * groupLength;
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index e5fdf786e224..a540ea87e317 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.client.model.HoodieFlinkInternalRowTypeInfo;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
@@ -206,6 +207,12 @@ public class Pipelines {
throw new HoodieNotSupportedException("Bucket index supports only upsert
operation. Please, use upsert operation or switch to another index type.");
}
+ Option<Partitioner> insertPartitioner =
OptionsResolver.getInsertPartitioner(conf);
+ if (insertPartitioner.isPresent()) {
+ RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+ dataStream = dataStream.partitionCustom(insertPartitioner.get(),
rowDataKeyGen::getHoodieKey);
+ }
+
WriteOperatorFactory<RowData> operatorFactory =
AppendWriteOperator.getFactory(conf, rowType);
return dataStream
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGroupedInsertPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGroupedInsertPartitioner.java
new file mode 100644
index 000000000000..79df70d29823
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGroupedInsertPartitioner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestGroupedInsertPartitioner {
+
+ @Test
+ void testPartitioner() {
+ Configuration conf = new Configuration();
+ int para = 30;
+ conf.set(FlinkOptions.DEFAULT_PARALLELISM_PER_PARTITION, para);
+ GroupedInsertPartitioner partitioner = new GroupedInsertPartitioner(conf);
+ int numberFlinkPartitions = 2023;
+ int numberDataPartition = 1030;
+ int recordsPerPartition = 20;
+ HashMap<Integer, Integer> res = new HashMap<>();
+ String partitionPath = "dt=2023-11-27/hour=01/index=";
+ for (int partitionIndex = 0; partitionIndex < numberDataPartition;
partitionIndex++) {
+ for (int recordIndex = 0; recordIndex < recordsPerPartition;
recordIndex++) {
+ int id = partitioner.partition(new HoodieKey("id" + recordIndex,
partitionPath + partitionIndex), numberFlinkPartitions);
+ if (res.containsKey(id)) {
+ Integer value = res.get(id);
+ res.put(id, value + 1);
+ } else {
+ res.put(id, 1);
+ }
+ }
+ }
+
+ assertTrue(res.size() <= numberFlinkPartitions
+ && res.size() >= (numberFlinkPartitions - numberFlinkPartitions %
para));
+ }
+
+}
\ No newline at end of file