hudi-agent commented on code in PR #18640:
URL: https://github.com/apache/hudi/pull/18640#discussion_r3206198466
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java:
##########
@@ -196,16 +196,26 @@ private void checkIndexType(Configuration conf) {
HoodieIndexConfig.INDEX_TYPE.checkValues(indexTypeStr);
}
HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
- if (indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX) {
- ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
- String.format("Metadata table should be enabled when %s is %s.",
FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
-
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
- String.format("Partition level index updating is not supported for
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.",
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
-
- boolean deferredRLI = Boolean.parseBoolean(conf.getString(
- HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(),
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
- ValidationUtils.checkArgument(!deferredRLI,
- String.format("Deferred RLI initialization is not supported for
flink ingestion, please set '%s' = 'false'.",
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key()));
+ switch (indexType) {
+ case GLOBAL_RECORD_LEVEL_INDEX:
+ ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
+ String.format("Metadata table should be enabled when %s is %s.",
FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
+
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
+ String.format("Partition level index updating is not supported for
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.",
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
+
+ boolean deferredRLI = Boolean.parseBoolean(conf.getString(
+ HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(),
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
+ ValidationUtils.checkArgument(!deferredRLI,
+ String.format("Deferred RLI initialization is not supported for
flink ingestion, please set '%s' = 'false'.",
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key()));
+ break;
+ case RECORD_LEVEL_INDEX:
+ ValidationUtils.checkArgument(OptionsResolver.isUpsertOperation(conf)
|| OptionsResolver.isInsertOverwrite(conf),
+ "Partitioned record level index supports only Flink streaming
upsert and insert overwrite.");
+
ValidationUtils.checkArgument(!OptionsResolver.isNonBlockingConcurrencyControl(conf),
+ "Partitioned record level index does not support non-blocking
concurrency control.");
Review Comment:
🤖 The previous `Pipelines.validateRecordLevelIndexStreamWrite` had three
checks; this case keeps the upsert/insertOverwrite and NBCC ones but drops
`!OptionsResolver.isMultiWriter(conf)`. Since `isMultiWriter` is true for both
OCC and NBCC, OPTIMISTIC_CONCURRENCY_CONTROL is now silently accepted for
partitioned RLI — was that intentional? Each writer bootstraps its own
`RecordLevelIndexBackend` cache from MDT and won't observe the other writer's
in-flight assignments. Also, `sanityCheck` doesn't run on the
`HoodieFlinkStreamer` path, so misconfigurations there will no longer be
rejected up front.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]