yihua commented on code in PR #11943:
URL: https://github.com/apache/hudi/pull/11943#discussion_r1799908699
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -185,41 +196,24 @@ public class HoodieTableConfig extends HoodieConfig {
.key("hoodie.timeline.layout.version")
.noDefaultValue()
.withDocumentation("Version of timeline used, by the table.");
-
- //TODO: why is this the default? not OVERWRITE_WITH_LATEST?
- public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
+
+ public static final ConfigProperty<RecordMergeMode> RECORD_MERGE_MODE =
ConfigProperty
.key("hoodie.record.merge.mode")
- .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
+ .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING)
.sinceVersion("1.0.0")
.withDocumentation(RecordMergeMode.class);
public static final ConfigProperty<String> PAYLOAD_CLASS_NAME =
ConfigProperty
.key("hoodie.compaction.payload.class")
- .defaultValue(DefaultHoodieRecordPayload.class.getName())
+ .noDefaultValue()
Review Comment:
Let's write down more details in the PR description so the Hudi contributors
know how to use the relevant APIs in the future, especially around (1) how the
payload class is now determined (no default for OVERWRITE_WITH_LATEST and
EVENT_TIME_ORDERING, vs default as the payload class rules all), compared to
before; (2) how the merge mode decides the payload class and where this logic
is used.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java:
##########
@@ -94,19 +94,17 @@ public static RecordPayloadType fromClassName(String
className) {
return CUSTOM;
}
- public static String getPayloadClassName(HoodieConfig config) {
+ public static Option<String> getPayloadClassName(HoodieConfig config) {
String payloadClassName;
if (config.contains(PAYLOAD_CLASS_NAME)) {
payloadClassName = config.getString(PAYLOAD_CLASS_NAME);
- } else if (config.contains(PAYLOAD_TYPE)) {
- payloadClassName =
RecordPayloadType.valueOf(config.getString(PAYLOAD_TYPE)).getClassName();
} else if (config.contains("hoodie.datasource.write.payload.class")) {
payloadClassName =
config.getString("hoodie.datasource.write.payload.class");
} else {
- payloadClassName =
RecordPayloadType.valueOf(PAYLOAD_TYPE.defaultValue()).getClassName();
+ return Option.empty();
Review Comment:
Should this return the default payload class name?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -151,8 +149,8 @@ public class HoodieTableMetaClient implements Serializable {
*/
protected HoodieTableMetaClient(HoodieStorage storage, String basePath,
boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig,
Option<TimelineLayoutVersion> layoutVersion,
- String payloadClassName, String
recordMergerStrategy, HoodieTimeGeneratorConfig timeGeneratorConfig,
- FileSystemRetryConfig fileSystemRetryConfig) {
+ RecordMergeMode recordMergeMode, String
payloadClassName, String recordMergerStrategy,
Review Comment:
Could we define a `RecordMergeConfig` POJO to pass the record merge related
configs around?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java:
##########
@@ -94,19 +94,17 @@ public static RecordPayloadType fromClassName(String
className) {
return CUSTOM;
}
- public static String getPayloadClassName(HoodieConfig config) {
+ public static Option<String> getPayloadClassName(HoodieConfig config) {
String payloadClassName;
if (config.contains(PAYLOAD_CLASS_NAME)) {
payloadClassName = config.getString(PAYLOAD_CLASS_NAME);
- } else if (config.contains(PAYLOAD_TYPE)) {
- payloadClassName =
RecordPayloadType.valueOf(config.getString(PAYLOAD_TYPE)).getClassName();
Review Comment:
If we think it's OK to ignore format changes in beta releases, let's have a
Jira to add notes around this change and tools to convert payload type to the
merge mode/payload class in the table config.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java:
##########
@@ -94,19 +94,17 @@ public static RecordPayloadType fromClassName(String
className) {
return CUSTOM;
}
- public static String getPayloadClassName(HoodieConfig config) {
+ public static Option<String> getPayloadClassName(HoodieConfig config) {
String payloadClassName;
if (config.contains(PAYLOAD_CLASS_NAME)) {
payloadClassName = config.getString(PAYLOAD_CLASS_NAME);
- } else if (config.contains(PAYLOAD_TYPE)) {
- payloadClassName =
RecordPayloadType.valueOf(config.getString(PAYLOAD_TYPE)).getClassName();
Review Comment:
We may want to keep payload type config since it's out in 1.0.0 beta
releases and treat it as deprecated, only using the payload config from the
table config to derive the payload class and not allowing it in the write
config. cc @codope
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -185,41 +196,24 @@ public class HoodieTableConfig extends HoodieConfig {
.key("hoodie.timeline.layout.version")
.noDefaultValue()
.withDocumentation("Version of timeline used, by the table.");
-
- //TODO: why is this the default? not OVERWRITE_WITH_LATEST?
- public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
+
+ public static final ConfigProperty<RecordMergeMode> RECORD_MERGE_MODE =
ConfigProperty
.key("hoodie.record.merge.mode")
- .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
+ .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING)
.sinceVersion("1.0.0")
.withDocumentation(RecordMergeMode.class);
public static final ConfigProperty<String> PAYLOAD_CLASS_NAME =
ConfigProperty
.key("hoodie.compaction.payload.class")
- .defaultValue(DefaultHoodieRecordPayload.class.getName())
+ .noDefaultValue()
.deprecatedAfter("1.0.0")
.withDocumentation("Payload class to use for performing merges,
compactions, i.e merge delta logs with current base file and then "
+ " produce a new base file.");
- public static final ConfigProperty<String> PAYLOAD_TYPE = ConfigProperty
- .key("hoodie.compaction.payload.type")
- .defaultValue(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())
- .sinceVersion("1.0.0")
- .withDocumentation(RecordPayloadType.class);
-
public static final ConfigProperty<String> RECORD_MERGER_STRATEGY =
ConfigProperty
- .key("hoodie.compaction.record.merger.strategy")
- .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
- .withInferFunction(cfg -> {
- switch
(RecordMergeMode.valueOf(cfg.getStringOrDefault(RECORD_MERGE_MODE))) {
- case EVENT_TIME_ORDERING:
- return Option.of(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
- case OVERWRITE_WITH_LATEST:
- return
Option.of(HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID);
- case CUSTOM:
- default:
- return Option.empty();
- }
- })
+ .key("hoodie.record.merge.custom.strategy")
+ .noDefaultValue()
+ .withAlternatives("hoodie.compaction.record.merger.strategy")
Review Comment:
There are places that use `ConfigProperty#key` directly to get configs.
Let's make sure the alternative table config key can actually take effect.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -151,8 +149,8 @@ public class HoodieTableMetaClient implements Serializable {
*/
protected HoodieTableMetaClient(HoodieStorage storage, String basePath,
boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig,
Option<TimelineLayoutVersion> layoutVersion,
- String payloadClassName, String
recordMergerStrategy, HoodieTimeGeneratorConfig timeGeneratorConfig,
- FileSystemRetryConfig fileSystemRetryConfig) {
+ RecordMergeMode recordMergeMode, String
payloadClassName, String recordMergerStrategy,
Review Comment:
Upper layer should only use `RecordMergeConfig` while the write/table config
and meta client layer should determine what are contained?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -684,21 +690,85 @@ public void setInitialVersion(HoodieTableVersion
initialVersion) {
}
public RecordMergeMode getRecordMergeMode() {
- return
RecordMergeMode.valueOf(getStringOrDefault(RECORD_MERGE_MODE).toUpperCase());
+ return RecordMergeMode.getValue(getStringOrDefault(RECORD_MERGE_MODE));
}
/**
* Read the payload class for HoodieRecords from the table properties.
*/
- public String getPayloadClass() {
+ public Option<String> getPayloadClass() {
return RecordPayloadType.getPayloadClassName(this);
}
+ public String getAvroPayloadClass() {
+ return getPayloadClass().orElseGet(() ->
HoodieRecordPayload.getAvroPayloadForMergeMode(getRecordMergeMode()));
+ }
+
+ public String getAvroPayloadClassNonThrow() {
+ return getPayloadClass().orElseGet(() ->
HoodieRecordPayload.getAvroPayloadForMergeModeNonThrow(getRecordMergeMode()));
+ }
+
/**
* Read the payload class for HoodieRecords from the table properties.
*/
- public String getRecordMergerStrategy() {
- return getStringOrDefault(RECORD_MERGER_STRATEGY);
+ public Option<String> getRecordMergerStrategy() {
+ return getStringOpt(RECORD_MERGER_STRATEGY);
+ }
+
+ public String getAvroRecordMergerStrategy() {
+ return getRecordMergerStrategy().orElseGet(() ->
HoodieRecordMerger.getAvroMergerStrategyFromMergeMode(getRecordMergeMode()));
+ }
+
+ public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode, String
payloadClassName, String recordMergerStrategy) {
+ checkArgument(payloadClassName == null ||
!payloadClassName.equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"),
"Expression payload is being deduced!!!!!!");
Review Comment:
Fix the error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]