This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d4ff54b571c7 fix(flink): Use timestamp based partitioning in
AutoRowDataKeyGen (#18090)
d4ff54b571c7 is described below
commit d4ff54b571c73478317801dd8201cc8023801c0c
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Feb 26 19:20:26 2026 -0800
fix(flink): Use timestamp based partitioning in AutoRowDataKeyGen (#18090)
Summary:
Problem: when set FlinkOptions.RECORD_KEY_FIELD to empty, flink hudi allows
to auto generate hoodie_record_key without specifying primary key uuid which is
nullified in ingestion job. However the partition format is incorrectly set.
Solution: use TimestampBasedAvroKeyGenerator to format partitions in the
class of AutoRowDataKeyGen same as done in RowDataKeyGen
Co-authored-by: Jing Li <[email protected]>
---
.../apache/hudi/sink/bulk/AutoRowDataKeyGen.java | 22 ++++++++++++++++++----
.../apache/hudi/sink/bulk/TestRowDataKeyGens.java | 20 ++++++++++++++++++++
2 files changed, 38 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
index 27979ee93c59..5bf5d944f7fa 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
@@ -22,11 +22,16 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import java.io.IOException;
+
/**
* Key generator for {@link RowData} that use an auto key generator.
*/
@@ -42,17 +47,26 @@ public class AutoRowDataKeyGen extends RowDataKeyGen {
RowType rowType,
boolean hiveStylePartitioning,
boolean encodePartitionPath,
- boolean useCompkexKeygenNewEncoding) {
- super(Option.empty(), partitionFields, rowType, hiveStylePartitioning,
encodePartitionPath, false, Option.empty(),
- useCompkexKeygenNewEncoding);
+ boolean useComplexKeygenNewEncoding,
+ Option<TimestampBasedAvroKeyGenerator> keyGenOpt) {
+ super(Option.empty(), partitionFields, rowType, hiveStylePartitioning,
encodePartitionPath, false, keyGenOpt,
+ useComplexKeygenNewEncoding);
this.taskId = taskId;
this.instantTime = instantTime;
}
public static RowDataKeyGen instance(Configuration conf, RowType rowType,
int taskId, String instantTime) {
+ Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
+ if
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.get(FlinkOptions.KEYGEN_CLASS_NAME)))
{
+ try {
+ keyGeneratorOpt = Option.of(new
TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf)));
+ } catch (IOException e) {
+ throw new HoodieKeyException("Initialize
TimestampBasedAvroKeyGenerator error", e);
+ }
+ }
return new AutoRowDataKeyGen(taskId, instantTime,
conf.get(FlinkOptions.PARTITION_PATH_FIELD),
rowType, conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING),
conf.get(FlinkOptions.URL_ENCODE_PARTITIONING),
- OptionsResolver.useComplexKeygenNewEncoding(conf));
+ OptionsResolver.useComplexKeygenNewEncoding(conf), keyGeneratorOpt);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
index 18b85fde733d..0199aa74c14b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
@@ -245,6 +245,26 @@ public class TestRowDataKeyGens {
assertThat(keyGen1.getRecordKey(rowData3), is(instantTime + "_" + taskId +
"_2"));
}
+ @Test
+ void testTimestampBasedKeyGeneratorForKeylessWrite() {
+ final String partitionFormat = "yyyy/MM/dd";
+ final int taskId = 3;
+ final String instantTime = "000001";
+
+ Configuration conf = TestConfigurations.getDefaultConf("path1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+ conf.set(FlinkOptions.PARTITION_PATH_FIELD, "ts");
+ conf.set(FlinkOptions.PARTITION_FORMAT, partitionFormat);
+ HoodieTableFactory.setupTimestampKeygenOptions(conf,
DataTypes.TIMESTAMP(3));
+
+ final RowData rowData = insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(7200000),
StringData.fromString("par1"));
+
+ final RowDataKeyGen keyGen = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE, taskId, instantTime);
+ assertThat(keyGen.getRecordKey(rowData), is(instantTime + "_" + taskId +
"_0"));
+ assertThat(keyGen.getPartitionPath(rowData), is("1970/01/01"));
+ }
+
@Test
void testRecordKeyContainsTimestamp() {
Configuration conf = TestConfigurations.getDefaultConf("path1");