nsivabalan commented on code in PR #13046:
URL: https://github.com/apache/hudi/pull/13046#discussion_r2017973607
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java:
##########
@@ -38,7 +40,7 @@ public interface DowngradeHandler {
* @param upgradeDowngradeHelper instance of {@link
SupportsUpgradeDowngrade} to be used.
* @return Map of config properties and its values to be added to table
properties.
*/
- Map<ConfigProperty, String> downgrade(
+ Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(
Review Comment:
fix java docs at L 40, 41 for the new arg added.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java:
##########
@@ -178,23 +179,68 @@ static void upgradePartitionFields(HoodieWriteConfig
config, HoodieTableConfig t
}
static void upgradeMergeMode(HoodieTableConfig tableConfig,
Map<ConfigProperty, String> tablePropsToAdd) {
- if (tableConfig.getPayloadClass() != null
- &&
tableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
{
- if (HoodieTableType.COPY_ON_WRITE == tableConfig.getTableType()) {
+ String payloadClass = tableConfig.getPayloadClass();
+ String preCombineField = tableConfig.getPreCombineField();
+ if (isCustomPayloadClass(payloadClass)) {
+ // This contains a special case: HoodieMetadataPayload.
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ payloadClass);
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_MODE,
+ RecordMergeMode.CUSTOM.name());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+ HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else if (StringUtils.nonEmpty(preCombineField)) {
+ // This contains a special case: OverwriteWithLatestPayload with
preCombine field.
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ DefaultHoodieRecordPayload.class.getName());
Review Comment:
won't we run into Table config validation issue here? i.e for payload class,
before upgrade it could be OverwriteWithLatest and post upgrade its
DefaultHoodieRecordPayload.
I am not suggesting to revert this. But just curious if there are chances of
exception.
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java:
##########
@@ -122,4 +115,42 @@ void testPropertyUpgrade() {
assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME));
assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE));
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "com.example.CustomPayload, , CUSTOM,
00000000-0000-0000-0000-000000000000, com.example.CustomPayload",
+ "com.example.CustomPayload, preCombineFieldValue, CUSTOM,
00000000-0000-0000-0000-000000000000, com.example.CustomPayload",
+ "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, ,
COMMIT_TIME_ORDERING, ce9acb64-bde0-424c-9b91-f6ebba25356d,
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload, ,
EVENT_TIME_ORDERING, eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", preCombineFieldValue, EVENT_TIME_ORDERING,
eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload,
preCombineFieldValue, EVENT_TIME_ORDERING,"
+ + " eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", preCombineFieldValue, EVENT_TIME_ORDERING,
eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload,
preCombineFieldValue, EVENT_TIME_ORDERING,"
+ + " eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", , COMMIT_TIME_ORDERING, ce9acb64-bde0-424c-9b91-f6ebba25356d,
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
Review Comment:
can we add an entry for MDT payload as well
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java:
##########
@@ -122,4 +115,42 @@ void testPropertyUpgrade() {
assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME));
assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE));
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "com.example.CustomPayload, , CUSTOM,
00000000-0000-0000-0000-000000000000, com.example.CustomPayload",
+ "com.example.CustomPayload, preCombineFieldValue, CUSTOM,
00000000-0000-0000-0000-000000000000, com.example.CustomPayload",
Review Comment:
is it not possible to reuse or define variable for the strategyId and use it
here rather than hardcoding as string values
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java:
##########
@@ -178,23 +179,68 @@ static void upgradePartitionFields(HoodieWriteConfig
config, HoodieTableConfig t
}
static void upgradeMergeMode(HoodieTableConfig tableConfig,
Map<ConfigProperty, String> tablePropsToAdd) {
- if (tableConfig.getPayloadClass() != null
- &&
tableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
{
- if (HoodieTableType.COPY_ON_WRITE == tableConfig.getTableType()) {
+ String payloadClass = tableConfig.getPayloadClass();
+ String preCombineField = tableConfig.getPreCombineField();
+ if (isCustomPayloadClass(payloadClass)) {
+ // This contains a special case: HoodieMetadataPayload.
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ payloadClass);
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_MODE,
+ RecordMergeMode.CUSTOM.name());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+ HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else if (StringUtils.nonEmpty(preCombineField)) {
+ // This contains a special case: OverwriteWithLatestPayload with
preCombine field.
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ DefaultHoodieRecordPayload.class.getName());
Review Comment:
oh, is it possible to leave the payload class as is, but just set the merge
mode to EventTime based merge mode?
if not, we might break existing pipelines.
post upgrade, user's write config could have OverwriteWithLatest, but table
config might say as DefaultHoodieRecordPayload and table config validation
might fail.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java:
##########
@@ -133,7 +138,7 @@ public Map<ConfigProperty, String>
downgrade(HoodieWriteConfig config, HoodieEng
// downgrade table properties
downgradePartitionFields(config, metaClient.getTableConfig(),
tablePropsToAdd);
unsetInitialVersion(metaClient.getTableConfig(), tablePropsToAdd);
- unsetRecordMergeMode(metaClient.getTableConfig(), tablePropsToAdd);
+ List<ConfigProperty> configsToRemove = unsetRecordMergeMode(config,
metaClient.getTableConfig(), tablePropsToAdd);
Review Comment:
lets name this in similar manner as other.
tablePropsToRemove
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java:
##########
@@ -122,4 +115,42 @@ void testPropertyUpgrade() {
assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME));
assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE));
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "com.example.CustomPayload, , CUSTOM,
00000000-0000-0000-0000-000000000000, com.example.CustomPayload",
+ "com.example.CustomPayload, preCombineFieldValue, CUSTOM,
00000000-0000-0000-0000-000000000000, com.example.CustomPayload",
+ "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, ,
COMMIT_TIME_ORDERING, ce9acb64-bde0-424c-9b91-f6ebba25356d,
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload, ,
EVENT_TIME_ORDERING, eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", preCombineFieldValue, EVENT_TIME_ORDERING,
eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload,
preCombineFieldValue, EVENT_TIME_ORDERING,"
+ + " eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", preCombineFieldValue, EVENT_TIME_ORDERING,
eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload,
preCombineFieldValue, EVENT_TIME_ORDERING,"
+ + " eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", , COMMIT_TIME_ORDERING, ce9acb64-bde0-424c-9b91-f6ebba25356d,
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+ })
+ void testUpgradeMergeMode(String payloadClass, String preCombineField,
String expectedMergeMode, String expectedStrategy, String expectedPayloadClass)
{
+ HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+
+ when(tableConfig.getPayloadClass()).thenReturn(payloadClass);
+ when(tableConfig.getPreCombineField()).thenReturn(preCombineField);
+
+ SevenToEightUpgradeHandler.upgradeMergeMode(tableConfig, tablePropsToAdd);
+
+ assertEquals(expectedMergeMode,
tablePropsToAdd.get(HoodieTableConfig.RECORD_MERGE_MODE));
+ assertEquals(expectedStrategy,
tablePropsToAdd.get(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID));
+ if (expectedPayloadClass != null) {
+ assertEquals(expectedPayloadClass,
tablePropsToAdd.get(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+ }
Review Comment:
if expected payload class is null, should we assert that tablePropsToAdd
does not contain it.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java:
##########
@@ -178,23 +179,68 @@ static void upgradePartitionFields(HoodieWriteConfig
config, HoodieTableConfig t
}
static void upgradeMergeMode(HoodieTableConfig tableConfig,
Map<ConfigProperty, String> tablePropsToAdd) {
- if (tableConfig.getPayloadClass() != null
- &&
tableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
{
- if (HoodieTableType.COPY_ON_WRITE == tableConfig.getTableType()) {
+ String payloadClass = tableConfig.getPayloadClass();
+ String preCombineField = tableConfig.getPreCombineField();
+ if (isCustomPayloadClass(payloadClass)) {
+ // This contains a special case: HoodieMetadataPayload.
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ payloadClass);
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_MODE,
+ RecordMergeMode.CUSTOM.name());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+ HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else if (StringUtils.nonEmpty(preCombineField)) {
+ // This contains a special case: OverwriteWithLatestPayload with
preCombine field.
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ DefaultHoodieRecordPayload.class.getName());
Review Comment:
@yihua : is this a valid state for a given table config? and if yes, which
ones takes precedence.
i.e. payload class set to OverwriteWithLatest
and merge mode set to EventTimebased merge mode
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]