This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1582d602b0c5 fix(flink): Reject deferred RLI initialization for flink
writer (#18399)
1582d602b0c5 is described below
commit 1582d602b0c5366056bb3bb46304a5bafba3a551
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Apr 7 17:36:33 2026 +0800
fix(flink): Reject deferred RLI initialization for flink writer (#18399)
---
.../src/main/java/org/apache/hudi/table/HoodieTableFactory.java | 9 +++++++--
.../test/java/org/apache/hudi/table/TestHoodieTableFactory.java | 9 +++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 37a57cdd6e78..fa703b2eef1a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -190,9 +190,14 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
if (indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX) {
ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
- "Metadata table should be enabled when index.type is
GLOBAL_RECORD_LEVEL_INDEX.");
+ String.format("Metadata table should be enabled when %s is %s.",
FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
- "Partition level index updating is not supported for
GLOBAL_RECORD_LEVEL_INDEX, please set 'index.global.enabled' = 'true'.");
+ String.format("Partition level index updating is not supported for
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.",
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
+
+ boolean deferredRLI = Boolean.parseBoolean(conf.getString(
+ HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(),
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
+ ValidationUtils.checkArgument(!deferredRLI,
+ String.format("Deferred RLI initialization is not supported for
flink ingestion, please set '%s' = 'false'.",
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key()));
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 7ccbb59d089b..50bebdb3737c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
@@ -226,6 +227,14 @@ public class TestHoodieTableFactory {
this.conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, false);
final MockContext sourceContext5 = MockContext.getInstance(this.conf,
schema, "f2");
assertThrows(IllegalArgumentException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext5));
+
+ // Deferred RLI initialization is not supported
+ this.conf.set(FlinkOptions.INDEX_TYPE, "GLOBAL_RECORD_LEVEL_INDEX");
+ this.conf.set(FlinkOptions.METADATA_ENABLED, true);
+ this.conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, true);
+
this.conf.setString(HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(),
"true");
+ final MockContext sourceContext6 = MockContext.getInstance(this.conf,
schema, "f2");
+ assertThrows(IllegalArgumentException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext6));
}
@Test