linliu-code commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2211540687
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -20,17 +20,50 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_PROPERTIES;
+public class EightToNineUpgradeHandler implements UpgradeHandler {
@Override
- public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
HoodieEngineContext context,
- String instantTime,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-
- return Collections.emptyMap();
+ public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
+ HoodieEngineContext context,
+ String instantTime,
+ SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+ HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ String payloadClass = tableConfig.getPayloadClass();
+
+ String partialUpdateProperties = tableConfig.getPartialUpdateProperties();
+ if (!StringUtils.isNullOrEmpty(payloadClass)) {
+ if (payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+ String propertiesToAdd = DELETE_KEY + "=Op," + DELETE_MARKER + "=D";
+ partialUpdateProperties =
StringUtils.isNullOrEmpty(partialUpdateProperties)
+ ? propertiesToAdd : partialUpdateProperties + "," +
partialUpdateProperties;
+ } else if
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ String propertiesToAdd =
+ PARTIAL_UPDATE_CUSTOM_MARKER + "=" + DEBEZIUM_UNAVAILABLE_VALUE;
+ partialUpdateProperties =
StringUtils.isNullOrEmpty(partialUpdateProperties)
+ ? propertiesToAdd : partialUpdateProperties + "," +
partialUpdateProperties;
+ }
+ }
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]