This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6ae5b4b0d9b9 refactor: [HUDI-9335] Make `RowDataKeyGens::instance` the
common point for keygen instantiation for Flink (#13570)
6ae5b4b0d9b9 is described below
commit 6ae5b4b0d9b9e5dd2f22b4623fc651c39adf8f2c
Author: Geser Dugarov <[email protected]>
AuthorDate: Fri Dec 5 11:09:00 2025 +0700
refactor: [HUDI-9335] Make `RowDataKeyGens::instance` the common point for
keygen instantiation for Flink (#13570)
---
.../org/apache/hudi/sink/StreamWriteFunction.java | 3 +-
.../org/apache/hudi/sink/bulk/RowDataKeyGen.java | 2 +-
.../org/apache/hudi/sink/bulk/RowDataKeyGens.java | 18 ++++++-
.../sink/transform/RowDataToHoodieFunction.java | 3 +-
.../java/org/apache/hudi/sink/utils/Pipelines.java | 9 ++--
...tRowDataKeyGen.java => TestRowDataKeyGens.java} | 56 +++++++++++++++++-----
.../hudi/sink/utils/BulkInsertFunctionWrapper.java | 3 +-
7 files changed, 72 insertions(+), 22 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 4564ddb81607..5f4b43a81ac4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -36,6 +36,7 @@ import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.buffer.RowDataBucket;
import org.apache.hudi.sink.buffer.TotalSizeTracer;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.RowDataKeyGens;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
@@ -151,7 +152,7 @@ public class StreamWriteFunction extends
AbstractStreamWriteFunction<HoodieFlink
public StreamWriteFunction(Configuration config, RowType rowType) {
super(config);
this.rowType = rowType;
- this.keyGen = RowDataKeyGen.instance(config, rowType);
+ this.keyGen = RowDataKeyGens.instance(config, rowType);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
index 132eb3f799e5..8f0a213040a7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
@@ -142,7 +142,7 @@ public class RowDataKeyGen implements Serializable {
this.keyGenOpt = keyGenOpt;
}
- public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
+ static RowDataKeyGen instance(Configuration conf, RowType rowType) {
Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
if
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.get(FlinkOptions.KEYGEN_CLASS_NAME)))
{
try {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
index 6b36ae45d62f..df0e553feb29 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
@@ -19,10 +19,13 @@
package org.apache.hudi.sink.bulk;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.util.List;
/**
@@ -31,17 +34,28 @@ import java.util.List;
public class RowDataKeyGens {
/**
- * Creates a {@link RowDataKeyGen} with given configuration.
+ * Creates {@link RowDataKeyGen} of corresponding type depending on table
configuration.
*/
- public static RowDataKeyGen instance(Configuration conf, RowType rowType,
int taskId, String instantTime) {
+ public static RowDataKeyGen instance(Configuration conf, RowType rowType,
@Nullable Integer taskId, @Nullable String instantTime) {
String recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD);
if (hasRecordKey(recordKeys, rowType.getFieldNames())) {
return RowDataKeyGen.instance(conf, rowType);
} else {
+ if (null == taskId || null == instantTime) {
+ throw new HoodieValidationException(
+ String.format("'taskId' and 'instantTime' cannot be null to use
AutoRowDataKeyGen. 'taskId' = %s, 'instantTime' = %s", taskId, instantTime));
+ }
return AutoRowDataKeyGen.instance(conf, rowType, taskId, instantTime);
}
}
+ /**
+ * Creates {@link RowDataKeyGen} of a type, which doesn't expect parameters
besides table configuration and row type.
+ */
+ public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
+ return instance(conf, rowType, null, null);
+ }
+
/**
* Checks whether user provides any record key.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index 3699f8fd1c6e..4b065d5ed32a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -22,6 +22,7 @@ import org.apache.hudi.adapter.AbstractRichFunctionAdapter;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.RowDataKeyGens;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
@@ -36,7 +37,7 @@ public class RowDataToHoodieFunction<I extends RowData, O
extends HoodieFlinkInt
RowDataKeyGen keyGen;
public RowDataToHoodieFunction(RowType rowType, Configuration config) {
- this.keyGen = RowDataKeyGen.instance(config, rowType);
+ this.keyGen = RowDataKeyGens.instance(config, rowType);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 7f852f2a5455..15ae62844cea 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -38,6 +38,7 @@ import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.RowDataKeyGens;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
@@ -121,7 +122,7 @@ public class Pipelines {
}
String indexKeys = OptionsResolver.getIndexKeyField(conf);
BucketIndexPartitioner<HoodieKey> partitioner = new
BucketIndexPartitioner<>(conf, indexKeys);
- RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+ RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, rowType);
RowType rowTypeWithFileId =
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
InternalTypeInfo<RowData> typeInfo =
InternalTypeInfo.of(rowTypeWithFileId);
boolean needFixedFileIdSuffix =
OptionsResolver.isNonBlockingConcurrencyControl(conf);
@@ -146,7 +147,7 @@ public class Pipelines {
// see BatchExecutionUtils#applyBatchExecutionSettings for details.
Partitioner<String> partitioner = (key, channels) ->
KeyGroupRangeAssignment.assignKeyToParallelOperator(key,
KeyGroupRangeAssignment.computeDefaultMaxParallelism(PARALLELISM_VALUE),
channels);
- RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+ RowDataKeyGen rowDataKeyGen = RowDataKeyGens.instance(conf, rowType);
dataStream = dataStream.partitionCustom(partitioner,
rowDataKeyGen::getPartitionPath);
}
@@ -208,7 +209,7 @@ public class Pipelines {
Option<Partitioner> insertPartitioner =
OptionsResolver.getInsertPartitioner(conf);
if (insertPartitioner.isPresent()) {
- RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+ RowDataKeyGen rowDataKeyGen = RowDataKeyGens.instance(conf, rowType);
dataStream = dataStream.partitionCustom(insertPartitioner.get(),
rowDataKeyGen::getHoodieKey);
}
@@ -288,7 +289,7 @@ public class Pipelines {
Configuration conf,
RowType rowType,
DataStream<RowData> dataStream) {
- final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+ final RowDataKeyGen rowDataKeyGen = RowDataKeyGens.instance(conf, rowType);
// shuffle by partition keys
dataStream = dataStream
.keyBy(rowDataKeyGen::getPartitionPath);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
similarity index 82%
rename from
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
rename to
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
index 098f8036f85c..18b85fde733d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
@@ -21,10 +21,12 @@ package org.apache.hudi.sink.bulk;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieTableFactory;
import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
@@ -36,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.sql.Timestamp;
+import java.util.Collections;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_INPUT_DATE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT;
@@ -44,18 +47,19 @@ import static
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTI
import static org.apache.hudi.utils.TestData.insertRow;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertLinesMatch;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Test cases for {@link RowDataKeyGen}.
*/
-public class TestRowDataKeyGen {
+public class TestRowDataKeyGens {
@Test
void testSimpleKeyAndPartition() {
Configuration conf = TestConfigurations.getDefaultConf("path1");
final RowData rowData1 = insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
- final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen1.getRecordKey(rowData1), is("id1"));
assertThat(keyGen1.getPartitionPath(rowData1), is("par1"));
@@ -72,7 +76,7 @@ public class TestRowDataKeyGen {
// hive style partitioning
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
- final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen2.getPartitionPath(rowData1),
is(String.format("partition=%s", "par1")));
assertThat(keyGen2.getPartitionPath(rowData2),
is(String.format("partition=%s", DEFAULT_PARTITION_PATH)));
assertThat(keyGen2.getPartitionPath(rowData3),
is(String.format("partition=%s", DEFAULT_PARTITION_PATH)));
@@ -118,7 +122,7 @@ public class TestRowDataKeyGen {
conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition,ts");
RowData rowData1 = insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
- RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen1.getRecordKey(rowData1), is("uuid:id1,name:Danny"));
assertThat(keyGen1.getPartitionPath(rowData1),
is("par1/1970-01-01T00:00:00.001"));
@@ -134,7 +138,7 @@ public class TestRowDataKeyGen {
// hive style partitioning
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
- final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen2.getPartitionPath(rowData1),
is(String.format("partition=%s/ts=%s", "par1", "1970-01-01T00:00:00.001")));
assertThat(keyGen2.getPartitionPath(rowData2),
is(String.format("partition=%s/ts=%s", DEFAULT_PARTITION_PATH,
DEFAULT_PARTITION_PATH)));
assertThat(keyGen2.getPartitionPath(rowData3),
is(String.format("partition=%s/ts=%s", DEFAULT_PARTITION_PATH,
"1970-01-01T00:00:00.001")));
@@ -147,7 +151,7 @@ public class TestRowDataKeyGen {
HoodieTableFactory.setupTimestampKeygenOptions(conf,
DataTypes.TIMESTAMP(3));
final RowData rowData1 = insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(7200000), StringData.fromString("par1"));
- final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen1.getRecordKey(rowData1), is("id1"));
assertThat(keyGen1.getPartitionPath(rowData1), is("1970010102"));
@@ -165,7 +169,7 @@ public class TestRowDataKeyGen {
// hive style partitioning
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
- final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen2.getPartitionPath(rowData1), is("ts=1970010102"));
assertThat(keyGen2.getPartitionPath(rowData2), is("ts=1970010100"));
assertThat(keyGen2.getPartitionPath(rowData3), is("ts=1970010100"));
@@ -179,7 +183,7 @@ public class TestRowDataKeyGen {
conf.setString(TIMESTAMP_OUTPUT_DATE_FORMAT.key(), "yyyy-MM-dd");
final RowData rowData4 = insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(7200000),
StringData.fromString("2004-02-29 01:02:03"));
- final RowDataKeyGen keyGen3 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ final RowDataKeyGen keyGen3 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen3.getPartitionPath(rowData4), is("2004-02-29"));
}
@@ -193,7 +197,7 @@ public class TestRowDataKeyGen {
HoodieTableFactory.setupTimestampKeygenOptions(conf, DataTypes.DATE());
final RowData rowData1 = insertRow(TestConfigurations.ROW_TYPE_DATE,
StringData.fromString("id1"), StringData.fromString("Danny"), 23, 1);
- final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE_DATE);
+ final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE_DATE);
assertThat(keyGen1.getRecordKey(rowData1), is("id1"));
String expectedPartition1 = dashed ? "1970-01-02" : "19700102";
@@ -213,7 +217,7 @@ public class TestRowDataKeyGen {
// hive style partitioning
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
- final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE_DATE);
+ final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE_DATE);
assertThat(keyGen2.getPartitionPath(rowData1), is("dt=" +
expectedPartition1));
assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" +
expectedPartition2));
assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" +
expectedPartition3));
@@ -249,14 +253,42 @@ public class TestRowDataKeyGen {
Timestamp ts = new Timestamp(1675841687000L);
final RowData rowData1 = insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
TimestampData.fromTimestamp(ts), StringData.fromString("par1"));
- final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen1.getRecordKey(rowData1), is("uuid:id1,ts:" +
ts.toLocalDateTime().toString()));
conf.setString(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
"false");
- final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE);
assertThat(keyGen2.getRecordKey(rowData1),
is("uuid:id1,ts:1675841687000"));
+ }
+
+ @Test
+ void testAutoKeyGenRecordKey() {
+ Configuration conf = TestConfigurations.getDefaultConf("path1");
+ // without record keys AutoRowDataKeyGen will be used, which expects
taskId and instantTime parameters for instantiation
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+
+ int taskId = 1;
+ String instantTime = "20250716145212986";
+ final AutoRowDataKeyGen autoKeyGen = (AutoRowDataKeyGen)
RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, taskId, instantTime);
+ assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(0)),
is(instantTime + "_" + taskId + "_0"));
+ assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(1)),
is(instantTime + "_" + taskId + "_1"));
+ }
+
+ @Test
+ void testAutoKeyGenNotAllowNulls() {
+ Configuration conf = TestConfigurations.getDefaultConf("path1");
+ // without record keys AutoRowDataKeyGen will be used, which expects
taskId and instantTime parameters for instantiation
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+ HoodieValidationException exNullInstant =
+ assertThrows(HoodieValidationException.class, () ->
RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, 1, null));
+ assertLinesMatch(Collections.singletonList("'taskId' and 'instantTime'
cannot be null to use AutoRowDataKeyGen. 'taskId' = 1, 'instantTime' = null"),
+ Collections.singletonList(exNullInstant.getMessage()));
+ HoodieValidationException exNullTask =
+ assertThrows(HoodieValidationException.class, () ->
RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, null,
"20250716145212986"));
+ assertLinesMatch(Collections.singletonList("'taskId' and 'instantTime'
cannot be null to use AutoRowDataKeyGen. 'taskId' = null, 'instantTime' =
20250716145212986"),
+ Collections.singletonList(exNullTask.getMessage()));
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index 73fd955d3579..d78bd234896e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -27,6 +27,7 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.BulkInsertWriteFunction;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.RowDataKeyGens;
import org.apache.hudi.sink.bulk.sort.SortOperator;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.common.AbstractWriteFunction;
@@ -217,7 +218,7 @@ public class BulkInsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
private void setupMapFunction() {
- RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+ RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, rowType);
String indexKeys = OptionsResolver.getIndexKeyField(conf);
boolean needFixedFileIdSuffix =
OptionsResolver.isNonBlockingConcurrencyControl(conf);
this.bucketIdToFileId = new HashMap<>();