This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ae5b4b0d9b9 refactor: [HUDI-9335] Make `RowDataKeyGens::instance` the 
common point for keygen instantiation for Flink (#13570)
6ae5b4b0d9b9 is described below

commit 6ae5b4b0d9b9e5dd2f22b4623fc651c39adf8f2c
Author: Geser Dugarov <[email protected]>
AuthorDate: Fri Dec 5 11:09:00 2025 +0700

    refactor: [HUDI-9335] Make `RowDataKeyGens::instance` the common point for 
keygen instantiation for Flink (#13570)
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  3 +-
 .../org/apache/hudi/sink/bulk/RowDataKeyGen.java   |  2 +-
 .../org/apache/hudi/sink/bulk/RowDataKeyGens.java  | 18 ++++++-
 .../sink/transform/RowDataToHoodieFunction.java    |  3 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  9 ++--
 ...tRowDataKeyGen.java => TestRowDataKeyGens.java} | 56 +++++++++++++++++-----
 .../hudi/sink/utils/BulkInsertFunctionWrapper.java |  3 +-
 7 files changed, 72 insertions(+), 22 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 4564ddb81607..5f4b43a81ac4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -36,6 +36,7 @@ import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.buffer.RowDataBucket;
 import org.apache.hudi.sink.buffer.TotalSizeTracer;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.RowDataKeyGens;
 import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
@@ -151,7 +152,7 @@ public class StreamWriteFunction extends 
AbstractStreamWriteFunction<HoodieFlink
   public StreamWriteFunction(Configuration config, RowType rowType) {
     super(config);
     this.rowType = rowType;
-    this.keyGen = RowDataKeyGen.instance(config, rowType);
+    this.keyGen = RowDataKeyGens.instance(config, rowType);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
index 132eb3f799e5..8f0a213040a7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
@@ -142,7 +142,7 @@ public class RowDataKeyGen implements Serializable {
     this.keyGenOpt = keyGenOpt;
   }
 
-  public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
+  static RowDataKeyGen instance(Configuration conf, RowType rowType) {
     Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
     if 
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.get(FlinkOptions.KEYGEN_CLASS_NAME)))
 {
       try {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
index 6b36ae45d62f..df0e553feb29 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
@@ -19,10 +19,13 @@
 package org.apache.hudi.sink.bulk;
 
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieValidationException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /**
@@ -31,17 +34,28 @@ import java.util.List;
 public class RowDataKeyGens {
 
   /**
-   * Creates a {@link RowDataKeyGen} with given configuration.
+   * Creates {@link RowDataKeyGen} of corresponding type depending on table 
configuration.
    */
-  public static RowDataKeyGen instance(Configuration conf, RowType rowType, 
int taskId, String instantTime) {
+  public static RowDataKeyGen instance(Configuration conf, RowType rowType, 
@Nullable Integer taskId, @Nullable String instantTime) {
     String recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD);
     if (hasRecordKey(recordKeys, rowType.getFieldNames())) {
       return RowDataKeyGen.instance(conf, rowType);
     } else {
+      if (null == taskId || null == instantTime) {
+        throw new HoodieValidationException(
+            String.format("'taskId' and 'instantTime' cannot be null to use 
AutoRowDataKeyGen. 'taskId' = %s, 'instantTime' = %s", taskId, instantTime));
+      }
       return AutoRowDataKeyGen.instance(conf, rowType, taskId, instantTime);
     }
   }
 
+  /**
+   * Creates {@link RowDataKeyGen} of a type, which doesn't expect parameters 
besides table configuration and row type.
+   */
+  public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
+    return instance(conf, rowType, null, null);
+  }
+
   /**
    * Checks whether user provides any record key.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index 3699f8fd1c6e..4b065d5ed32a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -22,6 +22,7 @@ import org.apache.hudi.adapter.AbstractRichFunctionAdapter;
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.RowDataKeyGens;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.configuration.Configuration;
@@ -36,7 +37,7 @@ public class RowDataToHoodieFunction<I extends RowData, O 
extends HoodieFlinkInt
   RowDataKeyGen keyGen;
 
   public RowDataToHoodieFunction(RowType rowType, Configuration config) {
-    this.keyGen = RowDataKeyGen.instance(config, rowType);
+    this.keyGen = RowDataKeyGens.instance(config, rowType);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 7f852f2a5455..15ae62844cea 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -38,6 +38,7 @@ import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
 import org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction;
 import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.RowDataKeyGens;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
 import org.apache.hudi.sink.clustering.ClusteringCommitSink;
@@ -121,7 +122,7 @@ public class Pipelines {
       }
       String indexKeys = OptionsResolver.getIndexKeyField(conf);
       BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(conf, indexKeys);
-      RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+      RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, rowType);
       RowType rowTypeWithFileId = 
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
       InternalTypeInfo<RowData> typeInfo = 
InternalTypeInfo.of(rowTypeWithFileId);
       boolean needFixedFileIdSuffix = 
OptionsResolver.isNonBlockingConcurrencyControl(conf);
@@ -146,7 +147,7 @@ public class Pipelines {
         // see BatchExecutionUtils#applyBatchExecutionSettings for details.
         Partitioner<String> partitioner = (key, channels) -> 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key,
             
KeyGroupRangeAssignment.computeDefaultMaxParallelism(PARALLELISM_VALUE), 
channels);
-        RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+        RowDataKeyGen rowDataKeyGen = RowDataKeyGens.instance(conf, rowType);
         dataStream = dataStream.partitionCustom(partitioner, 
rowDataKeyGen::getPartitionPath);
       }
 
@@ -208,7 +209,7 @@ public class Pipelines {
 
     Option<Partitioner> insertPartitioner = 
OptionsResolver.getInsertPartitioner(conf);
     if (insertPartitioner.isPresent()) {
-      RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+      RowDataKeyGen rowDataKeyGen = RowDataKeyGens.instance(conf, rowType);
       dataStream = dataStream.partitionCustom(insertPartitioner.get(), 
rowDataKeyGen::getHoodieKey);
     }
 
@@ -288,7 +289,7 @@ public class Pipelines {
       Configuration conf,
       RowType rowType,
       DataStream<RowData> dataStream) {
-    final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
+    final RowDataKeyGen rowDataKeyGen = RowDataKeyGens.instance(conf, rowType);
     // shuffle by partition keys
     dataStream = dataStream
         .keyBy(rowDataKeyGen::getPartitionPath);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
similarity index 82%
rename from 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
rename to 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
index 098f8036f85c..18b85fde733d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
@@ -21,10 +21,12 @@ package org.apache.hudi.sink.bulk;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.table.HoodieTableFactory;
 import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
@@ -36,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.sql.Timestamp;
+import java.util.Collections;
 
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_INPUT_DATE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT;
@@ -44,18 +47,19 @@ import static 
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTI
 import static org.apache.hudi.utils.TestData.insertRow;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertLinesMatch;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
  * Test cases for {@link RowDataKeyGen}.
  */
-public class TestRowDataKeyGen {
+public class TestRowDataKeyGens {
   @Test
   void testSimpleKeyAndPartition() {
     Configuration conf = TestConfigurations.getDefaultConf("path1");
     final RowData rowData1 = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
-    final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
     assertThat(keyGen1.getRecordKey(rowData1), is("id1"));
     assertThat(keyGen1.getPartitionPath(rowData1), is("par1"));
 
@@ -72,7 +76,7 @@ public class TestRowDataKeyGen {
 
     // hive style partitioning
     conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
-    final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
     assertThat(keyGen2.getPartitionPath(rowData1), 
is(String.format("partition=%s", "par1")));
     assertThat(keyGen2.getPartitionPath(rowData2), 
is(String.format("partition=%s", DEFAULT_PARTITION_PATH)));
     assertThat(keyGen2.getPartitionPath(rowData3), 
is(String.format("partition=%s", DEFAULT_PARTITION_PATH)));
@@ -118,7 +122,7 @@ public class TestRowDataKeyGen {
     conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition,ts");
     RowData rowData1 = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
-    RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
     assertThat(keyGen1.getRecordKey(rowData1), is("uuid:id1,name:Danny"));
     assertThat(keyGen1.getPartitionPath(rowData1), 
is("par1/1970-01-01T00:00:00.001"));
 
@@ -134,7 +138,7 @@ public class TestRowDataKeyGen {
 
     // hive style partitioning
     conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
-    final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
     assertThat(keyGen2.getPartitionPath(rowData1), 
is(String.format("partition=%s/ts=%s", "par1", "1970-01-01T00:00:00.001")));
     assertThat(keyGen2.getPartitionPath(rowData2), 
is(String.format("partition=%s/ts=%s", DEFAULT_PARTITION_PATH, 
DEFAULT_PARTITION_PATH)));
     assertThat(keyGen2.getPartitionPath(rowData3), 
is(String.format("partition=%s/ts=%s", DEFAULT_PARTITION_PATH, 
"1970-01-01T00:00:00.001")));
@@ -147,7 +151,7 @@ public class TestRowDataKeyGen {
     HoodieTableFactory.setupTimestampKeygenOptions(conf, 
DataTypes.TIMESTAMP(3));
     final RowData rowData1 = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(7200000), StringData.fromString("par1"));
-    final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
 
     assertThat(keyGen1.getRecordKey(rowData1), is("id1"));
     assertThat(keyGen1.getPartitionPath(rowData1), is("1970010102"));
@@ -165,7 +169,7 @@ public class TestRowDataKeyGen {
 
     // hive style partitioning
     conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
-    final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
     assertThat(keyGen2.getPartitionPath(rowData1), is("ts=1970010102"));
     assertThat(keyGen2.getPartitionPath(rowData2), is("ts=1970010100"));
     assertThat(keyGen2.getPartitionPath(rowData3), is("ts=1970010100"));
@@ -179,7 +183,7 @@ public class TestRowDataKeyGen {
     conf.setString(TIMESTAMP_OUTPUT_DATE_FORMAT.key(), "yyyy-MM-dd");
     final RowData rowData4 = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(7200000), 
StringData.fromString("2004-02-29 01:02:03"));
-    final RowDataKeyGen keyGen3 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    final RowDataKeyGen keyGen3 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
     assertThat(keyGen3.getPartitionPath(rowData4), is("2004-02-29"));
   }
 
@@ -193,7 +197,7 @@ public class TestRowDataKeyGen {
     HoodieTableFactory.setupTimestampKeygenOptions(conf, DataTypes.DATE());
     final RowData rowData1 = insertRow(TestConfigurations.ROW_TYPE_DATE,
         StringData.fromString("id1"), StringData.fromString("Danny"), 23, 1);
-    final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE_DATE);
+    final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE_DATE);
 
     assertThat(keyGen1.getRecordKey(rowData1), is("id1"));
     String expectedPartition1 = dashed ? "1970-01-02" : "19700102";
@@ -213,7 +217,7 @@ public class TestRowDataKeyGen {
 
     // hive style partitioning
     conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
-    final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE_DATE);
+    final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE_DATE);
     assertThat(keyGen2.getPartitionPath(rowData1), is("dt=" + 
expectedPartition1));
     assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" + 
expectedPartition2));
     assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" + 
expectedPartition3));
@@ -249,14 +253,42 @@ public class TestRowDataKeyGen {
     Timestamp ts = new Timestamp(1675841687000L);
     final RowData rowData1 = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
         TimestampData.fromTimestamp(ts), StringData.fromString("par1"));
-    final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
 
     assertThat(keyGen1.getRecordKey(rowData1), is("uuid:id1,ts:" + 
ts.toLocalDateTime().toString()));
 
     
conf.setString(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
 "false");
-    final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    final RowDataKeyGen keyGen2 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE);
 
     assertThat(keyGen2.getRecordKey(rowData1), 
is("uuid:id1,ts:1675841687000"));
+  }
+
+  @Test
+  void testAutoKeyGenRecordKey() {
+    Configuration conf = TestConfigurations.getDefaultConf("path1");
+    // without record keys AutoRowDataKeyGen will be used, which expects 
taskId and instantTime parameters for instantiation
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+
+    int taskId = 1;
+    String instantTime = "20250716145212986";
+    final AutoRowDataKeyGen autoKeyGen = (AutoRowDataKeyGen) 
RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, taskId, instantTime);
+    assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(0)), 
is(instantTime + "_" + taskId + "_0"));
+    assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(1)), 
is(instantTime + "_" + taskId + "_1"));
+  }
+
+  @Test
+  void testAutoKeyGenNotAllowNulls() {
+    Configuration conf = TestConfigurations.getDefaultConf("path1");
+    // without record keys AutoRowDataKeyGen will be used, which expects 
taskId and instantTime parameters for instantiation
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
 
+    HoodieValidationException exNullInstant =
+        assertThrows(HoodieValidationException.class, () -> 
RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, 1, null));
+    assertLinesMatch(Collections.singletonList("'taskId' and 'instantTime' 
cannot be null to use AutoRowDataKeyGen. 'taskId' = 1, 'instantTime' = null"),
+        Collections.singletonList(exNullInstant.getMessage()));
+    HoodieValidationException exNullTask =
+        assertThrows(HoodieValidationException.class, () -> 
RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, null, 
"20250716145212986"));
+    assertLinesMatch(Collections.singletonList("'taskId' and 'instantTime' 
cannot be null to use AutoRowDataKeyGen. 'taskId' = null, 'instantTime' = 
20250716145212986"),
+        Collections.singletonList(exNullTask.getMessage()));
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index 73fd955d3579..d78bd234896e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -27,6 +27,7 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
 import org.apache.hudi.sink.bulk.BulkInsertWriteFunction;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.RowDataKeyGens;
 import org.apache.hudi.sink.bulk.sort.SortOperator;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
@@ -217,7 +218,7 @@ public class BulkInsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   }
 
   private void setupMapFunction() {
-    RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+    RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, rowType);
     String indexKeys = OptionsResolver.getIndexKeyField(conf);
     boolean needFixedFileIdSuffix = 
OptionsResolver.isNonBlockingConcurrencyControl(conf);
     this.bucketIdToFileId = new HashMap<>();

Reply via email to