This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 059649c51fbb perf(flink): Parse bucket index hash-field config once
instead of per ... (#18993)
059649c51fbb is described below
commit 059649c51fbbfeca809825548efa486d5ef80bdb
Author: voonhous <[email protected]>
AuthorDate: Wed Jun 17 11:21:45 2026 +0800
perf(flink): Parse bucket index hash-field config once instead of per ...
(#18993)
* perf(flink): Parse bucket index hash-field config once instead of per
record
Flink mirror of #18979 (issue #18978). The bucket-index write paths re-split
the comma-separated hoodie.bucket.index.hash.field config per record via the
String overload of BucketIdentifier.getBucketId. Precompute
KeyGenUtils.getIndexKeyFields(...) once and call the List overload:
- BucketIndexPartitioner / BucketIndexRemotePartitioner: parse once in the
constructor (factory and constructor signatures stay String-based)
- BucketStreamWriteFunction: parse once in open()
- BucketBulkInsertWriterHelper.rowWithFileId/getFileId: take List<String>;
Pipelines parses once before the per-record map stage
Behavior-preserving (same bucket ids).
* refactor(flink): Add OptionsResolver.getIndexKeyFields list accessor
Add OptionsResolver.getIndexKeyFields(Configuration) returning the parsed
List<String> and use it in BucketStreamWriteFunction.open() instead of
KeyGenUtils.getIndexKeyFields(OptionsResolver.getIndexKeyField(config)).
* perf(flink): Build NumBucketsFunction once for bulk insert instead of per
record
BucketBulkInsertWriterHelper.getFileId/rowWithFileId rebuilt
NumBucketsFunction
from three conf.get(...) lookups on every record in the bulk-insert map,
and its
constructor logs per record. Construct it once in Pipelines.bulkInsert (and
the
test wrapper) and pass it through; the map closure captures it since it is
Serializable, mirroring the sibling bucket-index partitioners.
Behavior-preserving.
* style(flink): Wrap long getFileId/rowWithFileId signatures under 200 chars
Fix checkstyle LineLength violations introduced when the NumBucketsFunction
parameter was added; wrap both signatures with paren-aligned continuations.
* review(flink): Use OptionsResolver.getIndexKeyFields in
Pipelines.bulkInsert
Use the OptionsResolver.getIndexKeyFields(conf) accessor for the parsed
list in
Pipelines.bulkInsert and drop the now-unused KeyGenUtils import; the String
form
is still kept for BucketIndexPartitionerFactory.create.
* review(flink): Use OptionsResolver.getIndexKeyFields in
BulkInsertFunctionWrapper
Mirror the Pipelines change in the test wrapper: use
OptionsResolver.getIndexKeyFields(conf)
for the parsed list and drop the now-unused indexKeys local and KeyGenUtils
import.
* review(flink): pass parsed index key fields list to
BucketIndexPartitionerFactory.create
---
.../org/apache/hudi/configuration/OptionsResolver.java | 7 +++++++
.../hudi/sink/bucket/BucketBulkInsertWriterHelper.java | 15 +++++++--------
.../hudi/sink/bucket/BucketStreamWriteFunction.java | 9 ++++++---
.../hudi/sink/partitioner/BucketIndexPartitioner.java | 12 ++++++++----
.../sink/partitioner/BucketIndexPartitionerFactory.java | 10 ++++++----
.../sink/partitioner/BucketIndexRemotePartitioner.java | 16 ++++++++++------
.../main/java/org/apache/hudi/sink/utils/Pipelines.java | 12 +++++++++---
.../partitioner/TestBucketIndexPartitionerFactory.java | 6 ++++--
.../partitioner/TestBucketIndexRemotePartitioner.java | 4 +++-
.../hudi/sink/utils/BulkInsertFunctionWrapper.java | 7 +++++--
10 files changed, 65 insertions(+), 33 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 5089b4eea3c7..8853d5844984 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -560,6 +560,13 @@ public class OptionsResolver {
return conf.getString(FlinkOptions.INDEX_KEY_FIELD.key(),
getRecordKeyStr(conf));
}
+ /**
+ * Returns the index key fields as a list, parsing the comma-separated
config value once.
+ */
+ public static List<String> getIndexKeyFields(Configuration conf) {
+ return KeyGenUtils.getIndexKeyFields(getIndexKeyField(conf));
+ }
+
/**
* Returns the conflict resolution strategy.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index fad1f7e9272b..5c324235251c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -19,7 +19,6 @@
package org.apache.hudi.sink.bucket;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
@@ -38,6 +37,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
/**
@@ -93,20 +93,19 @@ public class BucketBulkInsertWriterHelper extends
BulkInsertWriterHelper {
return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
}
- private static String getFileId(Map<String, String> bucketIdToFileId,
RowDataKeyGen keyGen, RowData record, String indexKeys, Configuration conf,
boolean needFixedFileIdSuffix) {
+ private static String getFileId(Map<String, String> bucketIdToFileId,
RowDataKeyGen keyGen, RowData record, List<String> indexKeyFields,
+ NumBucketsFunction numBucketsFunction,
boolean needFixedFileIdSuffix) {
String recordKey = keyGen.getRecordKey(record);
String partition = keyGen.getPartitionPath(record);
- NumBucketsFunction numBucketsFunction = new
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
- conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
-
final int numBuckets = numBucketsFunction.getNumBuckets(partition);
- final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys,
numBuckets);
+ final int bucketNum = BucketIdentifier.getBucketId(recordKey,
indexKeyFields, numBuckets);
String bucketId = partition + bucketNum;
return bucketIdToFileId.computeIfAbsent(bucketId, k ->
needFixedFileIdSuffix ? BucketIdentifier.newBucketFileIdForNBCC(bucketNum) :
BucketIdentifier.newBucketFileIdPrefix(bucketNum));
}
- public static RowData rowWithFileId(Map<String, String> bucketIdToFileId,
RowDataKeyGen keyGen, RowData record, String indexKeys, Configuration conf,
boolean needFixedFileIdSuffix) {
- final String fileId = getFileId(bucketIdToFileId, keyGen, record,
indexKeys, conf, needFixedFileIdSuffix);
+ public static RowData rowWithFileId(Map<String, String> bucketIdToFileId,
RowDataKeyGen keyGen, RowData record, List<String> indexKeyFields,
+ NumBucketsFunction numBucketsFunction,
boolean needFixedFileIdSuffix) {
+ final String fileId = getFileId(bucketIdToFileId, keyGen, record,
indexKeyFields, numBucketsFunction, needFixedFileIdSuffix);
return GenericRowData.of(StringData.fromString(fileId), record);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 949753ef8094..280c25fb9947 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -41,6 +41,7 @@ import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -56,7 +57,9 @@ public class BucketStreamWriteFunction extends
StreamWriteFunction {
private int parallelism;
- private String indexKeyFields;
+ // parsed once in open(); the per-record defineRecordLocation path uses the
List overload of
+ // getBucketId so the comma-separated config string is not re-split per
record
+ private List<String> indexKeyFieldList;
private boolean isNonBlockingConcurrencyControl;
@@ -102,7 +105,7 @@ public class BucketStreamWriteFunction extends
StreamWriteFunction {
@Override
public void open(Configuration parameters) throws IOException {
super.open(parameters);
- this.indexKeyFields = OptionsResolver.getIndexKeyField(config);
+ this.indexKeyFieldList = OptionsResolver.getIndexKeyFields(config);
this.isNonBlockingConcurrencyControl =
OptionsResolver.isNonBlockingConcurrencyControl(config);
this.taskID =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
this.parallelism =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
@@ -144,7 +147,7 @@ public class BucketStreamWriteFunction extends
StreamWriteFunction {
bootstrapIndexIfNeed(partition);
}
Map<Integer, String> bucketToFileId =
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
- final int bucketNum = BucketIdentifier.getBucketId(record.getRecordKey(),
indexKeyFields, numBucketsFunction.getNumBuckets(record.getPartitionPath()));
+ final int bucketNum = BucketIdentifier.getBucketId(record.getRecordKey(),
indexKeyFieldList, numBucketsFunction.getNumBuckets(record.getPartitionPath()));
final String bucketId = partition + "/" + bucketNum;
if (incBucketIndex.contains(bucketId)) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index fb5b3fdb6f0f..d63ebbbed86a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -28,6 +28,8 @@ import
org.apache.hudi.index.bucket.partition.NumBucketsFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.Configuration;
+import java.util.List;
+
/**
* Bucket index input partitioner.
* The fields to hash can be a subset of the primary key fields.
@@ -36,13 +38,15 @@ import org.apache.flink.configuration.Configuration;
*/
public class BucketIndexPartitioner<T extends HoodieKey> implements
Partitioner<T> {
- private final String indexKeyFields;
+ // Index key fields, pre-parsed by the caller. The per-record partition()
path uses the List
+ // overload of getBucketId so the comma-separated config string is never
re-split per record.
+ private final List<String> indexKeyFieldList;
private final NumBucketsFunction numBucketsFunction;
private Functions.Function3<Integer, String, Integer, Integer>
partitionIndexFunc;
- public BucketIndexPartitioner(Configuration conf, String indexKeyFields) {
- this.indexKeyFields = indexKeyFields;
+ public BucketIndexPartitioner(Configuration conf, List<String>
indexKeyFieldList) {
+ this.indexKeyFieldList = indexKeyFieldList;
this.numBucketsFunction = new
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
}
@@ -53,7 +57,7 @@ public class BucketIndexPartitioner<T extends HoodieKey>
implements Partitioner<
this.partitionIndexFunc =
BucketIndexUtil.getPartitionIndexFunc(numPartitions);
}
int numBuckets = numBucketsFunction.getNumBuckets(key.getPartitionPath());
- int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(),
indexKeyFields, numBuckets);
+ int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(),
indexKeyFieldList, numBuckets);
return this.partitionIndexFunc.apply(numBuckets, key.getPartitionPath(),
curBucket);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitionerFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitionerFactory.java
index eb194643ae1d..3943e8455a10 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitionerFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitionerFactory.java
@@ -24,6 +24,8 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.Configuration;
+import java.util.List;
+
/**
* Factory for simple bucket index partitioners.
*/
@@ -33,12 +35,12 @@ public class BucketIndexPartitionerFactory {
}
public static Partitioner<HoodieKey> create(Configuration conf) {
- return create(conf, OptionsResolver.getIndexKeyField(conf));
+ return create(conf, OptionsResolver.getIndexKeyFields(conf));
}
- public static Partitioner<HoodieKey> create(Configuration conf, String
indexKeyFields) {
+ public static Partitioner<HoodieKey> create(Configuration conf, List<String>
indexKeyFieldList) {
return OptionsResolver.shouldUseBucketRemotePartitioner(conf)
- ? new BucketIndexRemotePartitioner<>(conf, indexKeyFields)
- : new BucketIndexPartitioner<>(conf, indexKeyFields);
+ ? new BucketIndexRemotePartitioner<>(conf, indexKeyFieldList)
+ : new BucketIndexPartitioner<>(conf, indexKeyFieldList);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java
index 0ef8378c3519..db70152a2a70 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java
@@ -30,6 +30,8 @@ import org.apache.hudi.util.ViewStorageProperties;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.Configuration;
+import java.util.List;
+
/**
* Bucket index input partitioner backed by the embedded timeline service.
*
@@ -38,20 +40,22 @@ import org.apache.flink.configuration.Configuration;
public class BucketIndexRemotePartitioner<T extends HoodieKey> implements
Partitioner<T> {
private final Configuration conf;
- private final String indexKeyFields;
+ // Index key fields, pre-parsed by the caller. The per-record partition()
path uses the List
+ // overload of getBucketId so the comma-separated config string is never
re-split per record.
+ private final List<String> indexKeyFieldList;
private final NumBucketsFunction numBucketsFunction;
private transient RemotePartitionHelper remotePartitionHelper;
- public BucketIndexRemotePartitioner(Configuration conf, String
indexKeyFields) {
+ public BucketIndexRemotePartitioner(Configuration conf, List<String>
indexKeyFieldList) {
this.conf = conf;
- this.indexKeyFields = indexKeyFields;
+ this.indexKeyFieldList = indexKeyFieldList;
this.numBucketsFunction = new
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
}
- BucketIndexRemotePartitioner(Configuration conf, String indexKeyFields,
RemotePartitionHelper remotePartitionHelper) {
- this(conf, indexKeyFields);
+ BucketIndexRemotePartitioner(Configuration conf, List<String>
indexKeyFieldList, RemotePartitionHelper remotePartitionHelper) {
+ this(conf, indexKeyFieldList);
this.remotePartitionHelper = remotePartitionHelper;
}
@@ -59,7 +63,7 @@ public class BucketIndexRemotePartitioner<T extends
HoodieKey> implements Partit
public int partition(T key, int numPartitions) {
String partitionPath = normalizePartitionPath(key.getPartitionPath());
int numBuckets = numBucketsFunction.getNumBuckets(partitionPath);
- int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(),
indexKeyFields, numBuckets);
+ int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(),
indexKeyFieldList, numBuckets);
return doGetRemotePartition(getRemotePartitionHelper(), numBuckets,
partitionPath, curBucket, numPartitions);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 8c0975c9943d..7caacf49cf64 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -28,6 +28,7 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteFunctions;
@@ -85,6 +86,7 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
@@ -139,8 +141,12 @@ public class Pipelines {
throw new HoodieException(
"Consistent hashing bucket index does not work with bulk insert
using FLINK engine. Use simple bucket index or Spark engine.");
}
- String indexKeys = OptionsResolver.getIndexKeyField(conf);
- Partitioner<HoodieKey> partitioner =
BucketIndexPartitionerFactory.create(conf, indexKeys);
+ List<String> indexKeyFieldList = OptionsResolver.getIndexKeyFields(conf);
+ // built once and captured by the per-record map closure
(NumBucketsFunction is Serializable),
+ // avoiding a per-record rebuild from conf inside
BucketBulkInsertWriterHelper
+ NumBucketsFunction numBucketsFunction = new
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+ conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+ Partitioner<HoodieKey> partitioner =
BucketIndexPartitionerFactory.create(conf, indexKeyFieldList);
RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, rowType);
RowType rowTypeWithFileId =
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
InternalTypeInfo<RowData> typeInfo =
InternalTypeInfo.of(rowTypeWithFileId);
@@ -148,7 +154,7 @@ public class Pipelines {
Map<String, String> bucketIdToFileId = new HashMap<>();
dataStream = dataStream.partitionCustom(partitioner,
keyGen::getHoodieKey)
- .map(record ->
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record,
indexKeys, conf, needFixedFileIdSuffix), typeInfo)
+ .map(record ->
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record,
indexKeyFieldList, numBucketsFunction, needFixedFileIdSuffix), typeInfo)
.setParallelism(PARALLELISM_VALUE);
if (conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen =
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexPartitionerFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexPartitionerFactory.java
index b07cc4f434ef..9aad1821a224 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexPartitionerFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexPartitionerFactory.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
/**
@@ -36,7 +38,7 @@ class TestBucketIndexPartitionerFactory {
@Test
void testCreateLocalBucketIndexPartitioner() {
- Partitioner<HoodieKey> partitioner =
BucketIndexPartitionerFactory.create(getSimpleBucketConf(), "id");
+ Partitioner<HoodieKey> partitioner =
BucketIndexPartitionerFactory.create(getSimpleBucketConf(),
Collections.singletonList("id"));
assertInstanceOf(BucketIndexPartitioner.class, partitioner);
}
@@ -46,7 +48,7 @@ class TestBucketIndexPartitionerFactory {
Configuration conf = getSimpleBucketConf();
conf.setString(HoodieIndexConfig.BUCKET_PARTITIONER.key(), "true");
- Partitioner<HoodieKey> partitioner =
BucketIndexPartitionerFactory.create(conf, "id");
+ Partitioner<HoodieKey> partitioner =
BucketIndexPartitionerFactory.create(conf, Collections.singletonList("id"));
assertInstanceOf(BucketIndexRemotePartitioner.class, partitioner);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexRemotePartitioner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexRemotePartitioner.java
index b693228a796b..b3dda319ea9e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexRemotePartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexRemotePartitioner.java
@@ -28,6 +28,8 @@ import
org.apache.hudi.index.bucket.partition.NumBucketsFunction;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
@@ -91,7 +93,7 @@ class TestBucketIndexRemotePartitioner {
when(remotePartitionHelper.getPartition(8, "", currentBucket,
16)).thenReturn(11);
BucketIndexRemotePartitioner<HoodieKey> partitioner =
- new BucketIndexRemotePartitioner<>(conf, "id", remotePartitionHelper);
+ new BucketIndexRemotePartitioner<>(conf,
Collections.singletonList("id"), remotePartitionHelper);
assertEquals(11, partitioner.partition(key, 16));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index 159ead016fb5..1d5cc2fc9a97 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.BulkInsertWriteFunction;
@@ -213,10 +214,12 @@ public class BulkInsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private void setupMapFunction() {
RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, rowType);
- String indexKeys = OptionsResolver.getIndexKeyField(conf);
+ List<String> indexKeyFieldList = OptionsResolver.getIndexKeyFields(conf);
+ NumBucketsFunction numBucketsFunction = new
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+ conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
boolean needFixedFileIdSuffix =
OptionsResolver.isNonBlockingConcurrencyControl(conf);
this.bucketIdToFileId = new HashMap<>();
- this.mapFunction = r ->
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r,
indexKeys, conf, needFixedFileIdSuffix);
+ this.mapFunction = r ->
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r,
indexKeyFieldList, numBucketsFunction, needFixedFileIdSuffix);
}
private void setupSortOperator() throws Exception {