nsivabalan commented on code in PR #13615:
URL: https://github.com/apache/hudi/pull/13615#discussion_r2232300319
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +843,82 @@ public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * since it has some different logic from previous versions.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> finalConfigs = new HashMap<>();
+ if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+ return finalConfigs;
+ }
+
+ // Properties for all tables.
+ // Since PARTIAL_UPDATE_MODE does not have default value, do not set it.
+ // For tables using MERGE MODEs (event time/commit time), or CUSTOM
mergers.
+ if (StringUtils.isNullOrEmpty(payloadClassName)) {
+ checkIfConsistent(recordMergeMode, recordMergeStrategyId);
+ finalConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), recordMergeStrategyId);
+ // TODO: We need to make sure these two are consistent.
+ // NOTE: No payload class should be set.
+ } else { // For tables using custom payload classes.
+ // For tables using CUSTOM payload classes.
+ if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+ finalConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else { // For tables using builtin payload classes.
+ // Merge mode.
+ // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of
PAYLOAD_CLASS_NAME here.
+ if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(),
EVENT_TIME_ORDERING.name());
+ finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else {
+ finalConfigs.put(RECORD_MERGE_MODE.key(),
COMMIT_TIME_ORDERING.name());
+ finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ }
+ // Partial update mode.
+ if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+ ||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
+ finalConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
+ } else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ finalConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_MARKERS.name());
+ }
+ // Merge properties.
+ if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+ } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName()))
{
+ finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY, "Op");
+ finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_MARKER, "D");
+ }
+ }
+ }
+ return finalConfigs;
+ }
+
+ private static void checkIfConsistent(RecordMergeMode mode, String
strategyId) {
+ boolean isConsistent;
Review Comment:
don't we already have similar methods ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +843,82 @@ public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * since it has some different logic from previous versions.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> finalConfigs = new HashMap<>();
Review Comment:
how about reconciledConfigs?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -119,6 +129,35 @@ public class HoodieTableConfig extends HoodieConfig {
public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";
public static final String PARTIAL_UPDATE_CUSTOM_MARKER =
"hoodie.write.partial.update.custom.marker";
public static final String DEBEZIUM_UNAVAILABLE_VALUE =
"__debezium_unavailable_value";
+ // This prefix is used to set merging related properties.
+ // A reader might need to read some writer properties to function as
expected,
+ // and Hudi stores properties with this prefix so the reader parses these
properties,
+ // and produces a map of key value pairs (Key1->Value1, Key2->Value2, ...)
to use.
Review Comment:
mis placed java docs
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +844,133 @@ public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * since it has some different logic from previous versions.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> finalConfigs = new HashMap<>();
+ if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+ return finalConfigs;
+ }
+
+ // Infer merge mode and strategy id if needed.
+ Pair<RecordMergeMode, String> mergeModeAndStrategyId =
+ inferMergeModeOrStrategyId(recordMergeMode, recordMergeStrategyId,
payloadClassName);
+ recordMergeMode = mergeModeAndStrategyId.getLeft();
+ recordMergeStrategyId = mergeModeAndStrategyId.getRight();
+
+ // Properties for all tables.
+ // Since PARTIAL_UPDATE_MODE does not have default value, do not set it.
+ // For tables using MERGE MODEs (event time/commit time), or CUSTOM
mergers.
+ if (StringUtils.isNullOrEmpty(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), recordMergeStrategyId);
+ // NOTE: No payload class should be set.
+ } else { // For tables using custom payload classes.
+ // For tables using CUSTOM payload classes.
+ if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+ finalConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else { // For tables using builtin payload classes.
+ // Merge mode.
+ // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of
PAYLOAD_CLASS_NAME here.
+ if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(),
EVENT_TIME_ORDERING.name());
+ finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else {
+ finalConfigs.put(RECORD_MERGE_MODE.key(),
COMMIT_TIME_ORDERING.name());
+ finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ }
+ // Partial update mode.
+ if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+ ||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
+ finalConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
+ } else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ finalConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_MARKERS.name());
+ }
+ // Merge properties.
+ if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+ } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName()))
{
+ finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY, "Op");
Review Comment:
lets declare static variables for "Op" and "D"
Infact AWSDmsAvroPayload should already have it
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +844,133 @@ public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * since it has some different logic from previous versions.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> finalConfigs = new HashMap<>();
+ if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+ return finalConfigs;
+ }
+
+ // Infer merge mode and strategy id if needed.
+ Pair<RecordMergeMode, String> mergeModeAndStrategyId =
+ inferMergeModeOrStrategyId(recordMergeMode, recordMergeStrategyId,
payloadClassName);
+ recordMergeMode = mergeModeAndStrategyId.getLeft();
+ recordMergeStrategyId = mergeModeAndStrategyId.getRight();
+
+ // Properties for all tables.
+ // Since PARTIAL_UPDATE_MODE does not have default value, do not set it.
+ // For tables using MERGE MODEs (event time/commit time), or CUSTOM
mergers.
+ if (StringUtils.isNullOrEmpty(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), recordMergeStrategyId);
+ // NOTE: No payload class should be set.
+ } else { // For tables using custom payload classes.
+ // For tables using CUSTOM payload classes.
+ if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+ finalConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
Review Comment:
can we validate that this is only possible w/ CUSTOM merge mode.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -1476,13 +1477,21 @@ public Properties build() {
tableConfig.setTableVersion(tableVersion);
tableConfig.setInitialVersion(tableVersion);
- Triple<RecordMergeMode, String, String> mergeConfigs =
- HoodieTableConfig.inferCorrectMergingBehavior(
- recordMergeMode, payloadClassName, recordMergerStrategyId,
preCombineField,
- tableVersion);
- tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
- tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), mergeConfigs.getMiddle());
- tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, mergeConfigs.getRight());
+ if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
+ Triple<RecordMergeMode, String, String> mergeConfigs =
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ recordMergeMode, payloadClassName, recordMergerStrategyId,
preCombineField,
+ tableVersion);
+ tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
+ tableConfig.setValue(PAYLOAD_CLASS_NAME.key(),
mergeConfigs.getMiddle());
+ tableConfig.setValue(RECORD_MERGE_STRATEGY_ID,
mergeConfigs.getRight());
+ } else {
+ Map<String, String> mergeConfigs = inferMergingConfigsForVersion9(
Review Comment:
we can simplify this.
we can call the same infer func that we do for table 8 version. and then do
the same mapping we do during 8 to 9 upgrade.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +843,82 @@ public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * since it has some different logic from previous versions.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
Review Comment:
why public?
Also, lets avoid making static methods everywhere. if we can keep it
private, lets go for it.
why make every method static
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]