nsivabalan commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2196406167
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -20,17 +20,50 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_PROPERTIES;
+public class EightToNineUpgradeHandler implements UpgradeHandler {
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
HoodieEngineContext context,
- String instantTime,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-
- return Collections.emptyMap();
+ public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
+ HoodieEngineContext context,
+ String instantTime,
+ SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ String payloadClass = tableConfig.getPayloadClass();
+
+ String partialUpdateProperties = tableConfig.getPartialUpdateProperties();
+ if (!StringUtils.isNullOrEmpty(payloadClass)) {
+ if (payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+ String propertiesToAdd = DELETE_KEY + "=Op," + DELETE_MARKER + "=D";
+ partialUpdateProperties =
StringUtils.isNullOrEmpty(partialUpdateProperties)
+ ? propertiesToAdd : partialUpdateProperties + "," +
partialUpdateProperties;
+ } else if
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ String propertiesToAdd =
+ PARTIAL_UPDATE_CUSTOM_MARKER + "=" + DEBEZIUM_UNAVAILABLE_VALUE;
+ partialUpdateProperties =
StringUtils.isNullOrEmpty(partialUpdateProperties)
+ ? propertiesToAdd : partialUpdateProperties + "," +
partialUpdateProperties;
+ }
+ }
Review Comment:
why are we not setting the the merge mode and the partial update mode in
table config based on payload class? thats the core fix we wanted to make right
which will help in migrating the payload to use the merge mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]