This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1f0dfb37d39a fix(flink): Add customized partitioner for global RLI
bucket assigner operator (#18725)
1f0dfb37d39a is described below
commit 1f0dfb37d39a1fe2ed09045f292457746cab4a5b
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed May 13 18:54:58 2026 +0800
fix(flink): Add customized partitioner for global RLI bucket assigner
operator (#18725)
---
.../java/org/apache/hudi/sink/utils/Pipelines.java | 3 +-
.../org/apache/hudi/sink/utils/TestPipelines.java | 91 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 44e9dfd335cc..45e629df5a7e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -58,10 +58,10 @@ import
org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
import org.apache.hudi.sink.partitioner.DynamicBucketAssignFunction;
import org.apache.hudi.sink.partitioner.DynamicBucketAssignOperator;
+import org.apache.hudi.sink.partitioner.GlobalRecordIndexPartitioner;
import org.apache.hudi.sink.partitioner.MiniBatchBucketAssignOperator;
import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;
import org.apache.hudi.sink.partitioner.RecordIndexPartitioner;
-import org.apache.hudi.sink.partitioner.GlobalRecordIndexPartitioner;
import org.apache.hudi.sink.partitioner.index.IndexRowUtils;
import org.apache.hudi.sink.partitioner.index.IndexWriteOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
@@ -463,6 +463,7 @@ public class Pipelines {
String assignerOperatorName = "bucket_assigner";
if (OptionsResolver.isGlobalRecordLevelIndex(conf) &&
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
return inputStream
+ .partitionCustom(new GlobalRecordIndexPartitioner(conf), row -> new
HoodieKey(row.getRecordKey(), row.getPartitionPath()))
.transform(
assignerOperatorName,
new HoodieFlinkInternalRowTypeInfo(rowType),
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestPipelines.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestPipelines.java
new file mode 100644
index 000000000000..97e3703df986
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestPipelines.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sink.partitioner.GlobalRecordIndexPartitioner;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link Pipelines}.
+ */
+public class TestPipelines {
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ void testGlobalRLIShufflesBucketAssignByGlobalRecordIndex() throws Exception
{
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+ conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false);
+
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+ conf.set(FlinkOptions.BUCKET_ASSIGN_TASKS, 4);
+ conf.set(FlinkOptions.WRITE_TASKS, 4);
+ conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<HoodieFlinkInternalRow> inputStream = env.fromCollection(
+ Collections.<HoodieFlinkInternalRow>emptyList(), new
HoodieFlinkInternalRowTypeInfo(TestConfigurations.ROW_TYPE));
+ DataStream<RowData> pipeline = Pipelines.hoodieStreamWrite(conf,
TestConfigurations.ROW_TYPE, inputStream);
+
+ assertEquals(2, countCustomPartitions(pipeline,
GlobalRecordIndexPartitioner.class));
+ }
+
+ private long countCustomPartitions(DataStream<?> stream, Class<?>
partitionerClass) throws Exception {
+ long count = 0;
+ for (Transformation<?> transformation :
stream.getTransformation().getTransitivePredecessors()) {
+ if (transformation instanceof PartitionTransformation) {
+ StreamPartitioner<?> partitioner = ((PartitionTransformation<?>)
transformation).getPartitioner();
+ if (partitioner instanceof CustomPartitionerWrapper
+ &&
partitionerClass.isInstance(getCustomPartitioner((CustomPartitionerWrapper<?,
?>) partitioner))) {
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+
+ private Object getCustomPartitioner(CustomPartitionerWrapper<?, ?>
partitionerWrapper) throws Exception {
+ Field partitionerField =
CustomPartitionerWrapper.class.getDeclaredField("partitioner");
+ partitionerField.setAccessible(true);
+ return partitionerField.get(partitionerWrapper);
+ }
+}