linliu-code commented on code in PR #13615:
URL: https://github.com/apache/hudi/pull/13615#discussion_r2232963853
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +844,133 @@ public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * since it has some different logic from previous versions.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> finalConfigs = new HashMap<>();
+ if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+ return finalConfigs;
+ }
+
+ // Infer merge mode and strategy id if needed.
+ Pair<RecordMergeMode, String> mergeModeAndStrategyId =
+ inferMergeModeOrStrategyId(recordMergeMode, recordMergeStrategyId,
payloadClassName);
+ recordMergeMode = mergeModeAndStrategyId.getLeft();
+ recordMergeStrategyId = mergeModeAndStrategyId.getRight();
+
+ // Properties for all tables.
+ // Since PARTIAL_UPDATE_MODE does not have default value, do not set it.
+ // For tables using MERGE MODEs (event time/commit time), or CUSTOM
mergers.
+ if (StringUtils.isNullOrEmpty(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), recordMergeStrategyId);
+ // NOTE: No payload class should be set.
+ } else { // For tables using custom payload classes.
+ // For tables using CUSTOM payload classes.
+ if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+ finalConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else { // For tables using builtin payload classes.
+ // Merge mode.
+ // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of
PAYLOAD_CLASS_NAME here.
+ if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(),
EVENT_TIME_ORDERING.name());
+ finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else {
+ finalConfigs.put(RECORD_MERGE_MODE.key(),
COMMIT_TIME_ORDERING.name());
+ finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ }
+ // Partial update mode.
+ if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+ ||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
+ finalConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
+ } else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ finalConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_MARKERS.name());
+ }
+ // Merge properties.
+ if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+ } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName()))
{
+ finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY, "Op");
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]