This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1acec1e64551 [HUDI-9593] support custom partitioner in append mode 
(#13557)
1acec1e64551 is described below

commit 1acec1e64551f33f3cf24449fe3cd52db39513fa
Author: fhan <[email protected]>
AuthorDate: Thu Jul 17 20:02:36 2025 +0800

    [HUDI-9593] support custom partitioner in append mode (#13557)
    
    * [HUDI-9593] support custom partitioner in append mode
    
    * cosmetic changes
    
    * rename GroupedInsertPartitioner & set groupNumber &  fix checkstyle
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
    Co-authored-by: danny0405 <[email protected]>
---
 .../apache/hudi/configuration/FlinkOptions.java    | 14 ++++
 .../apache/hudi/configuration/OptionsResolver.java | 17 +++++
 .../sink/partitioner/GroupedInsertPartitioner.java | 88 ++++++++++++++++++++++
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  7 ++
 .../partitioner/TestGroupedInsertPartitioner.java  | 60 +++++++++++++++
 5 files changed, 186 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 4b5287280530..8ee7fb2b8f2c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -128,6 +128,20 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Payload class used. Override this, if you like to roll 
your own merge logic, when upserting/inserting.\n"
           + "This will render any value set for the option in-effective");
 
+  public static final ConfigOption<String> INSERT_PARTITIONER_CLASS_NAME = 
ConfigOptions
+      .key("write.insert.partitioner.class.name")
+      .stringType()
+      .defaultValue("")
+      .withDescription("Insert partitioner to use aiming to re-balance records 
and reducing small file number "
+          + "in the scenario of multi-level partitioning. For example 
dt/hour/eventID"
+          + "Currently support 
org.apache.hudi.sink.partitioner.DefaultInsertPartitioner");
+
+  public static final ConfigOption<Integer> DEFAULT_PARALLELISM_PER_PARTITION 
= ConfigOptions
+      .key("write.insert.partitioner.default_parallelism_per_partition")
+      .intType()
+      .defaultValue(30)
+      .withDescription("The parallelism to use in each partition when using 
DefaultInsertPartitioner.");
+
   @AdvancedConfig
   public static final ConfigOption<String> RECORD_MERGER_IMPLS = ConfigOptions
       .key("record.merger.impls")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index e6b3a361b6dc..fb8a15edd636 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -31,6 +31,8 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -42,6 +44,7 @@ import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.HoodieFlinkIOFactory;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 
@@ -487,6 +490,20 @@ public class OptionsResolver {
     return isCowTable(conf) && isUpsertOperation(conf);
   }
 
+  /**
+   * Returns the customized insert partitioner instance.
+   */
+  public static Option<Partitioner> getInsertPartitioner(Configuration conf) {
+    String insertPartitionerClass = 
conf.getString(FlinkOptions.INSERT_PARTITIONER_CLASS_NAME);
+    try {
+      return StringUtils.isNullOrEmpty(insertPartitionerClass)
+          ? Option.empty()
+          : Option.of((Partitioner) 
ReflectionUtils.loadClass(insertPartitionerClass, conf));
+    } catch (Throwable e) {
+      throw new HoodieException("Could not create custom insert partitioner " 
+ insertPartitionerClass, e);
+    }
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GroupedInsertPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GroupedInsertPartitioner.java
new file mode 100644
index 000000000000..b00982009715
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GroupedInsertPartitioner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Random;
+
+/**
+ * Insert input partitioner.
+ *
+ * 1. Split numPartitions into multi bucket based on numPartitions / 
groupLength
+ * 2. Route each record into related Bucket based on (partitionPath.hashCode() 
& Integer.MAX_VALUE) % groupNumber
+ * 3. Using random to get specific index number.
+ *
+ * Need to take care of the last bucket to avoid resource wast.
+ */
+public class GroupedInsertPartitioner<T extends HoodieKey> implements 
Partitioner<T> {
+
+  private final int groupLength;
+  private final Random random;
+  private int groupNumber = -1;
+  private int remaining = -1;
+
+  public GroupedInsertPartitioner(Configuration conf) {
+    this.groupLength = 
conf.get(FlinkOptions.DEFAULT_PARALLELISM_PER_PARTITION); // default 30 ==> 
parallelism per partition
+    this.random = new Random();
+  }
+
+  /**
+   * Make sure that data with the same partition will only be routed to the 
same flink task group(groupIndex).
+   * @param hoodieKey
+   * @param numPartitions
+   * @return
+   */
+  @Override
+  public int partition(HoodieKey hoodieKey, int numPartitions) {
+    setupIfNecessary(numPartitions);
+    String partitionPath = hoodieKey.getPartitionPath();
+    int groupNumber = numPartitions / groupLength;
+    int remaining = numPartitions - groupNumber * groupLength;
+    ValidationUtils.checkArgument(groupNumber != 0,
+        String.format("write.insert.partitioner.parallelism.per.partition are 
greater than numPartitions %d.", numPartitions));
+
+    int groupIndex = (partitionPath.hashCode() & Integer.MAX_VALUE) % 
groupNumber;
+    int step;
+
+    if (remaining > 0 && groupIndex == groupNumber - 1) {
+      // the last group contains remaining partitions.
+      step = random.nextInt(groupLength + remaining);
+    } else {
+      step = random.nextInt(groupLength);
+    }
+    return groupIndex * groupLength + step;
+  }
+
+  /**
+   * set up groupNumber and remaining for the first time, avoid unnecessary 
calculation.
+   * @param numPartitions
+   */
+  private void setupIfNecessary(int numPartitions) {
+    if (groupNumber == -1 || remaining == -1) {
+      groupNumber = Math.max(1, numPartitions / groupLength);
+      remaining = numPartitions - groupNumber * groupLength;
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index e5fdf786e224..a540ea87e317 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.utils;
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
 import org.apache.hudi.client.model.HoodieFlinkInternalRowTypeInfo;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -206,6 +207,12 @@ public class Pipelines {
       throw new HoodieNotSupportedException("Bucket index supports only upsert 
operation. Please, use upsert operation or switch to another index type.");
     }
 
+    Option<Partitioner> insertPartitioner = 
OptionsResolver.getInsertPartitioner(conf);
+    if (insertPartitioner.isPresent()) {
+      RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+      dataStream = dataStream.partitionCustom(insertPartitioner.get(), 
rowDataKeyGen::getHoodieKey);
+    }
+
     WriteOperatorFactory<RowData> operatorFactory = 
AppendWriteOperator.getFactory(conf, rowType);
 
     return dataStream
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGroupedInsertPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGroupedInsertPartitioner.java
new file mode 100644
index 000000000000..79df70d29823
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGroupedInsertPartitioner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestGroupedInsertPartitioner {
+
+  @Test
+  void testPartitioner() {
+    Configuration conf = new Configuration();
+    int para = 30;
+    conf.set(FlinkOptions.DEFAULT_PARALLELISM_PER_PARTITION, para);
+    GroupedInsertPartitioner partitioner = new GroupedInsertPartitioner(conf);
+    int numberFlinkPartitions = 2023;
+    int numberDataPartition = 1030;
+    int recordsPerPartition = 20;
+    HashMap<Integer, Integer> res = new HashMap<>();
+    String partitionPath = "dt=2023-11-27/hour=01/index=";
+    for (int partitionIndex = 0; partitionIndex < numberDataPartition; 
partitionIndex++) {
+      for (int recordIndex = 0; recordIndex < recordsPerPartition; 
recordIndex++) {
+        int id = partitioner.partition(new HoodieKey("id" + recordIndex, 
partitionPath + partitionIndex), numberFlinkPartitions);
+        if (res.containsKey(id)) {
+          Integer value = res.get(id);
+          res.put(id, value + 1);
+        } else {
+          res.put(id, 1);
+        }
+      }
+    }
+
+    assertTrue(res.size() <= numberFlinkPartitions
+        && res.size() >= (numberFlinkPartitions - numberFlinkPartitions % 
para));
+  }
+
+}
\ No newline at end of file

Reply via email to