This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a06b8ebb603 [HUDI-8401] Addressing some follow up feedback on PR 
13519 (#13721)
0a06b8ebb603 is described below

commit 0a06b8ebb6032b82be75a16572813c43d073c35d
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Aug 15 22:10:28 2025 -0700

    [HUDI-8401] Addressing some follow up feedback on PR 13519 (#13721)
---
 .../table/upgrade/EightToNineUpgradeHandler.java   | 11 ++---
 .../table/upgrade/NineToEightDowngradeHandler.java |  4 +-
 .../upgrade/TestEightToNineUpgradeHandler.java     | 28 ++++++-----
 .../upgrade/TestNineToEightDowngradeHandler.java   |  4 +-
 .../hudi/common/table/HoodieTableConfig.java       | 23 +++++----
 .../hudi/common/table/PartialUpdateMode.java       | 19 ++------
 .../table/read/BufferedRecordMergerFactory.java    | 36 +++++++--------
 ...dateStrategy.java => PartialUpdateHandler.java} | 20 ++++----
 .../buffer/DefaultFileGroupRecordBufferLoader.java | 10 ++--
 .../table/read/buffer/FileGroupRecordBuffer.java   |  8 ++--
 .../read/buffer/KeyBasedFileGroupRecordBuffer.java |  6 +--
 .../buffer/PositionBasedFileGroupRecordBuffer.java |  6 +--
 .../ReusableFileGroupRecordBufferLoader.java       |  6 +--
 .../read/buffer/ReusableKeyBasedRecordBuffer.java  |  4 +-
 .../SortedKeyBasedFileGroupRecordBuffer.java       |  4 +-
 .../StreamingFileGroupRecordBufferLoader.java      |  6 +--
 .../read/buffer/UnmergedFileGroupRecordBuffer.java |  4 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  3 +-
 ...Strategy.java => TestPartialUpdateHandler.java} | 12 ++---
 .../read/buffer/BaseTestFileGroupRecordBuffer.java |  3 +-
 .../read/buffer/TestFileGroupRecordBuffer.java     |  3 +-
 .../buffer/TestFileGroupRecordBufferLoader.java    |  1 +
 .../buffer/TestKeyBasedFileGroupRecordBuffer.java  | 11 ++---
 .../buffer/TestReusableKeyBasedRecordBuffer.java   |  3 +-
 .../TestSortedKeyBasedFileGroupRecordBuffer.java   |  6 +--
 ...TestStreamingKeyBasedFileGroupRecordBuffer.java |  9 ++--
 .../hudi/common/table/TestHoodieTableConfig.java   | 14 +++---
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala |  4 +-
 .../hudi/functional/TestBufferedRecordMerger.java  | 54 +++++++++++-----------
 .../hudi/functional/TestEightToNineUpgrade.scala   | 21 +++++----
 .../functional/TestPayloadDeprecationFlow.scala    |  4 +-
 31 files changed, 165 insertions(+), 182 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
index 5d3c54d69d9b..ac4609983427 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
@@ -52,7 +52,7 @@ import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_M
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
@@ -160,9 +160,6 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
 
   private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
                                                 HoodieTableConfig tableConfig) 
{
-    // Set partial update mode for all tables.
-    tablePropsToAdd.put(PARTIAL_UPDATE_MODE, PartialUpdateMode.NONE.name()); 
// to be fixed once we land PR #13721
-
     String payloadClass = tableConfig.getPayloadClass();
     String mergeStrategy = tableConfig.getRecordMergeStrategyId();
     if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) || 
StringUtils.isNullOrEmpty(payloadClass)) {
@@ -172,9 +169,7 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
         || payloadClass.equals(PartialUpdateAvroPayload.class.getName())) {
       tablePropsToAdd.put(PARTIAL_UPDATE_MODE, 
PartialUpdateMode.IGNORE_DEFAULTS.name());
     } else if 
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
-      tablePropsToAdd.put(PARTIAL_UPDATE_MODE, 
PartialUpdateMode.IGNORE_MARKERS.name()); // to be fixed once we land PR #13721.
-    } else {
-      tablePropsToAdd.put(PARTIAL_UPDATE_MODE, PartialUpdateMode.NONE.name()); 
// to be fixed once we land PR #13721.
+      tablePropsToAdd.put(PARTIAL_UPDATE_MODE, 
PartialUpdateMode.FILL_UNAVAILABLE.name());
     }
   }
 
@@ -194,7 +189,7 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
           AWSDmsAvroPayload.DELETE_OPERATION_VALUE);
     } else if 
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
       tablePropsToAdd.put(
-          ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER).noDefaultValue(), // // to be fixed once we land 
PR #13721.
+          ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue(), // // to be fixed once we 
land PR #13721.
           DEBEZIUM_UNAVAILABLE_VALUE);
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
index af927926785c..730cdc84d400 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
@@ -42,7 +42,7 @@ import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
@@ -108,7 +108,7 @@ public class NineToEightDowngradeHandler implements 
DowngradeHandler {
       }
       if 
(legacyPayloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
         propertiesToRemove.add(
-            ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER).noDefaultValue());
+            ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue());
       }
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
index ed6c8edd55e1..424273082413 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
@@ -69,12 +69,12 @@ import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
-import static org.apache.hudi.common.table.PartialUpdateMode.IGNORE_MARKERS;
+import static org.apache.hudi.common.table.PartialUpdateMode.FILL_UNAVAILABLE;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -98,7 +98,7 @@ class TestEightToNineUpgradeHandler {
   private final SupportsUpgradeDowngrade upgradeDowngradeHelper =
       mock(SupportsUpgradeDowngrade.class);
   private final HoodieWriteConfig config = mock(HoodieWriteConfig.class);
-  private static final Map<ConfigProperty, String> DEFAULT_CONFIG_UPDATED = 
Collections.singletonMap(PARTIAL_UPDATE_MODE, PartialUpdateMode.NONE.name());
+  private static final Map<ConfigProperty, String> DEFAULT_CONFIG_UPDATED = 
Collections.emptyMap();
   private static final Set<ConfigProperty> DEFAULT_CONFIG_REMOVED = 
Collections.emptySet();
   private static final UpgradeDowngrade.TableConfigChangeSet 
DEFAULT_UPGRADE_RESULT =
       new UpgradeDowngrade.TableConfigChangeSet(DEFAULT_CONFIG_UPDATED, 
DEFAULT_CONFIG_REMOVED);
@@ -143,21 +143,21 @@ class TestEightToNineUpgradeHandler {
             DefaultHoodieRecordPayload.class.getName(),
             "",
             null,
-            PartialUpdateMode.NONE.name(),
+            null,
             "DefaultHoodieRecordPayload"
         ),
         Arguments.of(
             EventTimeAvroPayload.class.getName(),
             "",
             EVENT_TIME_ORDERING.name(),
-            PartialUpdateMode.NONE.name(),
+            null,
             "EventTimeAvroPayload"
         ),
         Arguments.of(
             OverwriteWithLatestAvroPayload.class.getName(),
             "",
             null,
-            PartialUpdateMode.NONE.name(),
+            null,
             "OverwriteWithLatestAvroPayload"
         ),
         Arguments.of(
@@ -165,15 +165,15 @@ class TestEightToNineUpgradeHandler {
             RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op,"
                 + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", // 
mergeProperties
             COMMIT_TIME_ORDERING.name(),
-            PartialUpdateMode.NONE.name(),
+            null,
             "AWSDmsAvroPayload"
         ),
         Arguments.of(
             PostgresDebeziumAvroPayload.class.getName(),
-            RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_CUSTOM_MARKER
+            RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE
                 + "=" + DEBEZIUM_UNAVAILABLE_VALUE,
             EVENT_TIME_ORDERING.name(),
-            IGNORE_MARKERS.name(),
+            FILL_UNAVAILABLE.name(),
             "PostgresDebeziumAvroPayload"
         ),
         Arguments.of(
@@ -187,7 +187,7 @@ class TestEightToNineUpgradeHandler {
             MySqlDebeziumAvroPayload.class.getName(),
             "",
             EVENT_TIME_ORDERING.name(),
-            PartialUpdateMode.NONE.name(),
+            null,
             "MySqlDebeziumAvroPayload"
         ),
         Arguments.of(
@@ -241,8 +241,12 @@ class TestEightToNineUpgradeHandler {
         assertEquals(expectedRecordMergeMode, 
propertiesToAdd.get(RECORD_MERGE_MODE));
       }
       // Assert partial update mode
-      assertTrue(propertiesToAdd.containsKey(PARTIAL_UPDATE_MODE));
-      assertEquals(expectedPartialUpdateMode, 
propertiesToAdd.get(PARTIAL_UPDATE_MODE));
+      if (expectedPartialUpdateMode != null) {
+        assertTrue(propertiesToAdd.containsKey(PARTIAL_UPDATE_MODE));
+        assertEquals(expectedPartialUpdateMode, 
propertiesToAdd.get(PARTIAL_UPDATE_MODE));
+      } else {
+        assertFalse(propertiesToAdd.containsKey(PARTIAL_UPDATE_MODE));
+      }
       // Assert payload class change
       assertPayloadClassChange(propertiesToAdd, propertiesToRemove, 
payloadClassName);
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
index 6713f297afd6..0e37b9c8a717 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
@@ -63,8 +63,8 @@ import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
@@ -146,7 +146,7 @@ class TestNineToEightDowngradeHandler {
         Arguments.of(
             PostgresDebeziumAvroPayload.class.getName(),
             3,
-            LEGACY_PAYLOAD_CLASS_NAME.key() + "," + PARTIAL_UPDATE_MODE.key() 
+ "," + RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_CUSTOM_MARKER,
+            LEGACY_PAYLOAD_CLASS_NAME.key() + "," + PARTIAL_UPDATE_MODE.key() 
+ "," + RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE,
             3,
             true,
             true,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 693eac723318..f6463e4280ec 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -130,7 +130,7 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final String HOODIE_PROPERTIES_FILE_BACKUP = 
"hoodie.properties.backup";
   public static final String HOODIE_WRITE_TABLE_NAME_KEY = 
"hoodie.datasource.write.table.name";
   public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";
-  public static final String PARTIAL_UPDATE_CUSTOM_MARKER = 
"hoodie.write.partial.update.custom.marker";
+  public static final String PARTIAL_UPDATE_UNAVAILABLE_VALUE = 
"hoodie.write.partial.update.unavailable.value";
   public static final String DEBEZIUM_UNAVAILABLE_VALUE = 
"__debezium_unavailable_value";
   // This prefix is used to set merging related properties.
   // A reader might need to read some writer properties to function as 
expected,
@@ -363,12 +363,11 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("When set to true, the table can support reading and 
writing multiple base file formats.");
 
-  public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE = 
ConfigProperty
+  public static final ConfigProperty<String> PARTIAL_UPDATE_MODE = 
ConfigProperty
       .key("hoodie.table.partial.update.mode")
-      .defaultValue(PartialUpdateMode.NONE)
+      .noDefaultValue()
       .sinceVersion("1.1.0")
-      .withDocumentation("This property when set, will define how two versions 
of the record will be "
-          + "merged together where the later contains only partial set of 
values and not entire record.");
+      .withDocumentation("This property when set, will define how two versions 
of the record will be merged together when records are partially formed");
 
   public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = 
KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
   public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = 
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
@@ -871,13 +870,13 @@ public class HoodieTableConfig extends HoodieConfig {
             || 
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
 {
           reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_DEFAULTS.name());
         } else if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
-          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_MARKERS.name());
+          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.FILL_UNAVAILABLE.name());
         }
         // Additional custom merge properties.
         // Cretain payloads are migrated to non payload way from 1.1 Hudi 
binary and the reader might need certain properties for the
         // merge to function as expected. Handing such special cases here.
         if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
-          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_UNAVAILABLE_VALUE, DEBEZIUM_UNAVAILABLE_VALUE);
         } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName())) 
{
           reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY, 
OP_FIELD);
           reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER, 
DELETE_OPERATION_VALUE);
@@ -1196,12 +1195,16 @@ public class HoodieTableConfig extends HoodieConfig {
             CONFIG_VALUES_DELIMITER));
   }
 
-  public PartialUpdateMode getPartialUpdateMode() {
+  public Option<PartialUpdateMode> getPartialUpdateMode() {
     if (getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE)) {
-      return 
PartialUpdateMode.valueOf(getStringOrDefault(PARTIAL_UPDATE_MODE));
+      if (contains(PARTIAL_UPDATE_MODE)) {
+        return 
Option.of(PartialUpdateMode.valueOf(getString(PARTIAL_UPDATE_MODE)));
+      } else {
+        return Option.empty();
+      }
     } else {
       // For table version <= 8, partial update is not supported.
-      return PartialUpdateMode.NONE;
+      return Option.empty();
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java
index d5622104e955..c0701f0459f5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java
@@ -22,18 +22,6 @@ package org.apache.hudi.common.table;
 import org.apache.hudi.common.config.EnumFieldDescription;
 
 public enum PartialUpdateMode {
-  @EnumFieldDescription(
-      "No partial update logic should be employed.")
-  NONE,
-
-  @EnumFieldDescription(
-      "For any column values missing in current record, pick value from 
previous version of the record.")
-  KEEP_VALUES,
-
-  @EnumFieldDescription(
-      "For column values missing in current record, pick the default value 
from the schema.")
-  FILL_DEFAULTS,
-
   @EnumFieldDescription(
       "For columns having default values set in current record, pick the value 
from previous version of the record."
       + "Only top level data type default is checked, which means this mode 
does not check leaf level data type default"
@@ -41,8 +29,7 @@ public enum PartialUpdateMode {
   IGNORE_DEFAULTS,
 
   @EnumFieldDescription(
-      "For columns having marker in the current record, pick value from 
previous version of the record during write."
-      + "Marker value can be defined using 
`hoodie.write.partial.update.custom.marker`, which should be added to"
-      + "the value of table config `hoodie.write.partial.update.properties`.")
-  IGNORE_MARKERS
+      "For columns having unavailable values in the current record, pick value 
from previous version of the record during write. "
+         + "Unavailable value can be defined using 
`hoodie.write.partial.update.unavailable.value` in the table property.")
+  FILL_UNAVAILABLE
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 8a12a13ec9cc..7ccc4c81d02d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -57,9 +57,9 @@ public class BufferedRecordMergerFactory {
                                                    Option<String> payloadClass,
                                                    Schema readerSchema,
                                                    TypedProperties props,
-                                                   PartialUpdateMode 
partialUpdateMode) {
+                                                   Option<PartialUpdateMode> 
partialUpdateModeOpt) {
     return create(readerContext, recordMergeMode, enablePartialMerging, 
recordMerger,
-        orderingFieldNames, readerSchema, payloadClass.map(p -> Pair.of(p, 
p)), props, partialUpdateMode);
+        orderingFieldNames, readerSchema, payloadClass.map(p -> Pair.of(p, 
p)), props, partialUpdateModeOpt);
   }
 
   public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> 
readerContext,
@@ -70,7 +70,7 @@ public class BufferedRecordMergerFactory {
                                                    Schema readerSchema,
                                                    Option<Pair<String, 
String>> payloadClasses,
                                                    TypedProperties props,
-                                                   PartialUpdateMode 
partialUpdateMode) {
+                                                   Option<PartialUpdateMode> 
partialUpdateModeOpt) {
     /**
      * This part implements KEEP_VALUES partial update mode, which merges two 
records that do not have all columns.
      * Other Partial update modes, like IGNORE_DEFAULTS assume all columns 
exists in the record,
@@ -78,21 +78,21 @@ public class BufferedRecordMergerFactory {
      */
     if (enablePartialMerging) {
       BufferedRecordMerger<T> deleteRecordMerger = create(
-          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldNames, readerSchema, payloadClasses, props, partialUpdateMode);
+          readerContext, recordMergeMode, false, recordMerger, 
orderingFieldNames, readerSchema, payloadClasses, props, Option.empty());
       return new 
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, deleteRecordMerger, orderingFieldNames, readerSchema, props);
     }
 
     switch (recordMergeMode) {
       case COMMIT_TIME_ORDERING:
-        if (partialUpdateMode == PartialUpdateMode.NONE) {
+        if (partialUpdateModeOpt.isEmpty()) {
           return new CommitTimeRecordMerger<>();
         }
-        return new 
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateMode, props);
+        return new 
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), props);
       case EVENT_TIME_ORDERING:
-        if (partialUpdateMode == PartialUpdateMode.NONE) {
+        if (partialUpdateModeOpt.isEmpty()) {
           return new EventTimeRecordMerger<>();
         }
-        return new 
EventTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateMode, props);
+        return new 
EventTimePartialRecordMerger<>(readerContext.getRecordContext(), 
partialUpdateModeOpt.get(), props);
       default:
         if (payloadClasses.isPresent()) {
           if 
(payloadClasses.get().getRight().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
 {
@@ -133,14 +133,14 @@ public class BufferedRecordMergerFactory {
    * based on {@code COMMIT_TIME_ORDERING} merge mode and partial update mode.
    */
   private static class CommitTimePartialRecordMerger<T> extends 
CommitTimeRecordMerger<T> {
-    private final PartialUpdateStrategy<T> partialUpdateStrategy;
+    private final PartialUpdateHandler<T> partialUpdateHandler;
     private final RecordContext<T> recordContext;
 
     public CommitTimePartialRecordMerger(RecordContext<T> recordContext,
                                          PartialUpdateMode partialUpdateMode,
                                          TypedProperties props) {
       super();
-      this.partialUpdateStrategy = new PartialUpdateStrategy<>(recordContext, 
partialUpdateMode, props);
+      this.partialUpdateHandler = new PartialUpdateHandler<>(recordContext, 
partialUpdateMode, props);
       this.recordContext = recordContext;
     }
 
@@ -148,7 +148,7 @@ public class BufferedRecordMergerFactory {
     public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord,
                                                 BufferedRecord<T> 
existingRecord) {
       if (existingRecord != null) {
-        newRecord = partialUpdateStrategy.partialMerge(
+        newRecord = partialUpdateHandler.partialMerge(
             newRecord,
             existingRecord,
             recordContext.getSchemaFromBufferRecord(newRecord),
@@ -161,7 +161,7 @@ public class BufferedRecordMergerFactory {
     @Override
     public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord,
                                      BufferedRecord<T> newerRecord) {
-      newerRecord = partialUpdateStrategy.partialMerge(
+      newerRecord = partialUpdateHandler.partialMerge(
           newerRecord,
           olderRecord,
           recordContext.getSchemaFromBufferRecord(newerRecord),
@@ -203,13 +203,13 @@ public class BufferedRecordMergerFactory {
    * based on {@code EVENT_TIME_ORDERING} merge mode and partial update mode.
    */
   private static class EventTimePartialRecordMerger<T> extends 
EventTimeRecordMerger<T> {
-    private final PartialUpdateStrategy<T> partialUpdateStrategy;
+    private final PartialUpdateHandler<T> partialUpdateHandler;
     private final RecordContext<T> recordContext;
 
     public EventTimePartialRecordMerger(RecordContext<T> recordContext,
                                         PartialUpdateMode partialUpdateMode,
                                         TypedProperties props) {
-      this.partialUpdateStrategy = new PartialUpdateStrategy<>(recordContext, 
partialUpdateMode, props);
+      this.partialUpdateHandler = new PartialUpdateHandler<>(recordContext, 
partialUpdateMode, props);
       this.recordContext = recordContext;
     }
 
@@ -218,7 +218,7 @@ public class BufferedRecordMergerFactory {
       if (existingRecord == null) {
         return Option.of(newRecord);
       } else if (shouldKeepNewerRecord(existingRecord, newRecord)) {
-        newRecord = partialUpdateStrategy.partialMerge(
+        newRecord = partialUpdateHandler.partialMerge(
             newRecord,
             existingRecord,
             recordContext.getSchemaFromBufferRecord(newRecord),
@@ -227,7 +227,7 @@ public class BufferedRecordMergerFactory {
         return Option.of(newRecord);
       } else {
         // Use existing record as the base record since existing record has 
higher ordering value.
-        existingRecord = partialUpdateStrategy.partialMerge(
+        existingRecord = partialUpdateHandler.partialMerge(
             existingRecord,
             newRecord,
             recordContext.getSchemaFromBufferRecord(existingRecord),
@@ -248,7 +248,7 @@ public class BufferedRecordMergerFactory {
       if (!olderRecord.isCommitTimeOrderingDelete()
           && oldOrderingValue.compareTo(newOrderingValue) > 0) {
         // Use old record as the base record since old record has higher 
ordering value.
-        olderRecord = partialUpdateStrategy.partialMerge(
+        olderRecord = partialUpdateHandler.partialMerge(
             olderRecord,
             newerRecord,
             recordContext.getSchemaFromBufferRecord(olderRecord),
@@ -257,7 +257,7 @@ public class BufferedRecordMergerFactory {
         return olderRecord;
       }
 
-      newerRecord = partialUpdateStrategy.partialMerge(
+      newerRecord = partialUpdateHandler.partialMerge(
           newerRecord,
           olderRecord,
           recordContext.getSchemaFromBufferRecord(newerRecord),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
similarity index 93%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
index 3a925cba9c58..c5ddedc1d714 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
@@ -32,7 +32,7 @@ import java.util.Map;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDefaultValue;
 import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
 import static org.apache.hudi.common.util.ConfigUtils.extractWithPrefix;
 
@@ -42,14 +42,14 @@ import static 
org.apache.hudi.common.util.ConfigUtils.extractWithPrefix;
  * {@link BufferedRecordMergerFactory.CommitTimePartialRecordMerger} and
  * {@link BufferedRecordMergerFactory.EventTimePartialRecordMerger}.
  */
-public class PartialUpdateStrategy<T> implements Serializable {
+public class PartialUpdateHandler<T> implements Serializable {
   private final RecordContext<T> recordContext;
   private final PartialUpdateMode partialUpdateMode;
   private final Map<String, String> mergeProperties;
 
-  public PartialUpdateStrategy(RecordContext<T> recordContext,
-                               PartialUpdateMode partialUpdateMode,
-                               TypedProperties props) {
+  public PartialUpdateHandler(RecordContext<T> recordContext,
+                              PartialUpdateMode partialUpdateMode,
+                              TypedProperties props) {
     this.recordContext = recordContext;
     this.partialUpdateMode = partialUpdateMode;
     this.mergeProperties = parseMergeProperties(props);
@@ -67,21 +67,17 @@ public class PartialUpdateStrategy<T> implements 
Serializable {
                                  boolean keepOldMetadataColumns) {
     // Note that: When either newRecord or oldRecord is a delete record,
     //            skip partial update since delete records do not have 
meaningful columns.
-    if (partialUpdateMode == PartialUpdateMode.NONE
-        || null == oldRecord
+    if (null == oldRecord
         || newRecord.isDelete()
         || oldRecord.isDelete()) {
       return newRecord;
     }
 
     switch (partialUpdateMode) {
-      case KEEP_VALUES:
-      case FILL_DEFAULTS:
-        return newRecord;
       case IGNORE_DEFAULTS:
         return reconcileDefaultValues(
             newRecord, oldRecord, newSchema, oldSchema, 
keepOldMetadataColumns);
-      case IGNORE_MARKERS:
+      case FILL_UNAVAILABLE:
         return reconcileMarkerValues(
             newRecord, oldRecord, newSchema, oldSchema);
       default:
@@ -136,7 +132,7 @@ public class PartialUpdateStrategy<T> implements 
Serializable {
     List<Schema.Field> fields = newSchema.getFields();
     Map<Integer, Object> updateValues = new HashMap<>();
     T engineRecord;
-    String partialUpdateCustomMarker = 
mergeProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+    String partialUpdateCustomMarker = 
mergeProperties.get(PARTIAL_UPDATE_UNAVAILABLE_VALUE);
     for (Schema.Field field : fields) {
       String fieldName = field.name();
       Object newValue = recordContext.getValue(newRecord.getRecord(), 
newSchema, fieldName);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
index e3fb3bfc4467..3e0609003ad3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
@@ -61,22 +61,22 @@ class DefaultFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBufferLoade
                                                                             
HoodieReadStats readStats,
                                                                             
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
     boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, 
HoodieReaderConfig.MERGE_TYPE, 
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
-    PartialUpdateMode partialUpdateMode = 
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
+    Option<PartialUpdateMode> partialUpdateModeOpt = 
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
     UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
     FileGroupRecordBuffer<T> recordBuffer;
     if (isSkipMerge) {
       recordBuffer = new UnmergedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, readStats);
+          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, readStats);
     } else if (readerParameters.sortOutputs()) {
       recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
     } else if (readerParameters.useRecordPosition() && 
inputSplit.getBaseFileOption().isPresent()) {
       recordBuffer = new PositionBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, inputSplit.getBaseFileOption().get().getCommitTime(), props,
+          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, inputSplit.getBaseFileOption().get().getCommitTime(), 
props,
           orderingFieldNames, updateProcessor);
     } else {
       recordBuffer = new KeyBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
     }
     return Pair.of(recordBuffer, scanLogFiles(readerContext, storage, 
inputSplit, hoodieTableMetaClient, props,
         readerParameters, readStats, recordBuffer));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
index 7052ba8c59c0..cb0bf252a209 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
@@ -70,7 +70,7 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
   protected final Schema readerSchema;
   protected final List<String> orderingFieldNames;
   protected final RecordMergeMode recordMergeMode;
-  protected final PartialUpdateMode partialUpdateMode;
+  protected final Option<PartialUpdateMode> partialUpdateModeOpt;
   protected final Option<HoodieRecordMerger> recordMerger;
   // The pair of payload classes represents the payload class for the table 
and the payload class for the incoming records.
   // The two classes are only expected to be different when there is a 
merge-into operation that leverages the ExpressionPayload.
@@ -93,7 +93,7 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
   protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                   HoodieTableMetaClient hoodieTableMetaClient,
                                   RecordMergeMode recordMergeMode,
-                                  PartialUpdateMode partialUpdateMode,
+                                  Option<PartialUpdateMode> 
partialUpdateModeOpt,
                                   TypedProperties props,
                                   List<String> orderingFieldNames,
                                   UpdateProcessor<T> updateProcessor) {
@@ -101,7 +101,7 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
     this.updateProcessor = updateProcessor;
     this.readerSchema = 
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
     this.recordMergeMode = recordMergeMode;
-    this.partialUpdateMode = partialUpdateMode;
+    this.partialUpdateModeOpt = partialUpdateModeOpt;
     this.recordMerger = readerContext.getRecordMerger();
     this.payloadClasses = readerContext.getPayloadClasses(props);
     this.orderingFieldNames = orderingFieldNames;
@@ -117,7 +117,7 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
       throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
     }
     this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
-        readerContext, recordMergeMode, enablePartialMerging, recordMerger, 
orderingFieldNames, readerSchema, payloadClasses, props, partialUpdateMode);
+        readerContext, recordMergeMode, enablePartialMerging, recordMerger, 
orderingFieldNames, readerSchema, payloadClasses, props, partialUpdateModeOpt);
     this.deleteContext = 
readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(this.readerSchema);
     this.bufferedRecordConverter = 
BufferedRecordConverter.createConverter(readerContext.getIteratorMode(), 
readerSchema, readerContext.getRecordContext(), orderingFieldNames);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
index 5f2d35f1186a..d6663ae35223 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
@@ -58,11 +58,11 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
   public KeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                        HoodieTableMetaClient 
hoodieTableMetaClient,
                                        RecordMergeMode recordMergeMode,
-                                       PartialUpdateMode partialUpdateMode,
+                                       Option<PartialUpdateMode> 
partialUpdateModeOpt,
                                        TypedProperties props,
                                        List<String> orderingFieldNames,
                                        UpdateProcessor<T> updateProcessor) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
   }
 
   @Override
@@ -87,7 +87,7 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
           readerSchema,
           payloadClasses,
           props,
-          partialUpdateMode);
+          partialUpdateModeOpt);
     }
 
     Schema schema = 
AvroSchemaCache.intern(recordsIteratorSchemaPair.getRight());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
index 51a48e34e8a4..415a5731e8af 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
@@ -73,12 +73,12 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
   public PositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
                                             HoodieTableMetaClient 
hoodieTableMetaClient,
                                             RecordMergeMode recordMergeMode,
-                                            PartialUpdateMode 
partialUpdateMode,
+                                            Option<PartialUpdateMode> 
partialUpdateModeOpt,
                                             String baseFileInstantTime,
                                             TypedProperties props,
                                             List<String> orderingFieldNames,
                                             UpdateProcessor<T> 
updateProcessor) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
     this.baseFileInstantTime = baseFileInstantTime;
   }
 
@@ -124,7 +124,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
           readerSchema,
           payloadClasses,
           props,
-          partialUpdateMode);
+          partialUpdateModeOpt);
     }
 
     Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema = 
getSchemaTransformerWithEvolvedSchema(dataBlock);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
index deedd4808b0e..aa3fbd22cf94 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
@@ -59,17 +59,17 @@ public class ReusableFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBuf
                                                                                
          HoodieReadStats readStats,
                                                                                
          Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
     UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
-    PartialUpdateMode partialUpdateMode = 
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
+    Option<PartialUpdateMode> partialUpdateModeOpt = 
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
     if (cachedResults == null) {
       // Create an initial buffer to process the log files
       KeyBasedFileGroupRecordBuffer<T> initialBuffer = new 
KeyBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
       List<String> validInstants = scanLogFiles(readerContextWithoutFilters, 
storage, inputSplit, hoodieTableMetaClient, props, readerParameters, readStats, 
initialBuffer);
       cachedResults = Pair.of(initialBuffer, validInstants);
     }
     // Create a reusable buffer with the results from the initial scan
     ReusableKeyBasedRecordBuffer<T> reusableBuffer = new 
ReusableKeyBasedRecordBuffer<>(
-        readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor, 
cachedResults.getLeft().getLogRecords());
+        readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor, 
cachedResults.getLeft().getLogRecords());
     return Pair.of(reusableBuffer, cachedResults.getRight());
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableKeyBasedRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableKeyBasedRecordBuffer.java
index fef96a8c21cf..bce8c7f9e74f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableKeyBasedRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableKeyBasedRecordBuffer.java
@@ -55,10 +55,10 @@ public class ReusableKeyBasedRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
   private final Map<Serializable, BufferedRecord<T>> existingRecords;
 
   ReusableKeyBasedRecordBuffer(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient hoodieTableMetaClient,
-                               RecordMergeMode recordMergeMode, 
PartialUpdateMode partialUpdateMode,
+                               RecordMergeMode recordMergeMode, 
Option<PartialUpdateMode> partialUpdateModeOpt,
                                TypedProperties props, List<String> 
orderingFieldNames,
                                UpdateProcessor<T> updateProcessor, 
Map<Serializable, BufferedRecord<T>> records) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
     this.existingRecords = records;
     ValidationUtils.checkArgument(readerContext.getKeyFilterOpt().orElse(null) 
instanceof Predicates.In,
         () -> "Key filter should be of type Predicates.In, but found: " + 
readerContext.getKeyFilterOpt().map(filter -> 
filter.getClass().getSimpleName()).orElse("NULL"));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/SortedKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/SortedKeyBasedFileGroupRecordBuffer.java
index c40dfca70804..686b044a6220 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/SortedKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/SortedKeyBasedFileGroupRecordBuffer.java
@@ -49,11 +49,11 @@ class SortedKeyBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupRecordBuff
   SortedKeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                              HoodieTableMetaClient 
hoodieTableMetaClient,
                                              RecordMergeMode recordMergeMode,
-                                             PartialUpdateMode 
partialUpdateMode,
+                                             Option<PartialUpdateMode> 
partialUpdateModeOpt,
                                              TypedProperties props,
                                              List<String> orderingFieldNames,
                                              UpdateProcessor<T> 
updateProcessor) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
index ebe09133ab5c..434af97c1fb1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
@@ -67,15 +67,15 @@ public class StreamingFileGroupRecordBufferLoader<T> 
implements FileGroupRecordB
                                                                             
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
     Schema recordSchema = 
HoodieAvroUtils.removeMetadataFields(readerContext.getSchemaHandler().getRequestedSchema());
     HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
-    PartialUpdateMode partialUpdateMode = tableConfig.getPartialUpdateMode();
+    Option<PartialUpdateMode> partialUpdateModeOpt = 
tableConfig.getPartialUpdateMode();
     UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
     FileGroupRecordBuffer<T> recordBuffer;
     if (readerParameters.sortOutputs()) {
       recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
     } else {
       recordBuffer = new KeyBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+          readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
     }
 
     RecordContext<T> recordContext = readerContext.getRecordContext();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/UnmergedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/UnmergedFileGroupRecordBuffer.java
index 3574ef35552e..580cb88e174b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/UnmergedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/UnmergedFileGroupRecordBuffer.java
@@ -53,10 +53,10 @@ class UnmergedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
       HoodieReaderContext<T> readerContext,
       HoodieTableMetaClient hoodieTableMetaClient,
       RecordMergeMode recordMergeMode,
-      PartialUpdateMode partialUpdateMode,
+      Option<PartialUpdateMode> partialUpdateModeOpt,
       TypedProperties props,
       HoodieReadStats readStats) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateMode, props, Collections.emptyList(), null);
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partialUpdateModeOpt, props, Collections.emptyList(), null);
     this.readStats = readStats;
     this.currentInstantLogBlocks = new ArrayDeque<>();
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 494be05327c0..aa993624dbd0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -75,7 +75,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
 import org.apache.hudi.common.table.read.UpdateProcessor;
@@ -1047,7 +1046,7 @@ public class HoodieTableMetadataUtil {
       readerContext.setSchemaHandler(new 
FileGroupReaderSchemaHandler<>(readerContext, writerSchemaOpt.get(), 
writerSchemaOpt.get(), Option.empty(), properties, datasetMetaClient));
       HoodieReadStats readStats = new HoodieReadStats();
       KeyBasedFileGroupRecordBuffer<T> recordBuffer = new 
KeyBasedFileGroupRecordBuffer<>(readerContext, datasetMetaClient,
-          readerContext.getMergeMode(), PartialUpdateMode.NONE, properties, 
tableConfig.getPreCombineFields(),
+          readerContext.getMergeMode(), Option.empty(), properties, 
tableConfig.getPreCombineFields(),
           UpdateProcessor.create(readStats, readerContext, true, 
Option.empty(), properties));
 
       // CRITICAL: Ensure allowInflightInstants is set to true
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateStrategy.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
similarity index 82%
rename from 
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateStrategy.java
rename to 
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
index ca4b099b9484..008c70916fae 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateStrategy.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
@@ -29,18 +29,18 @@ import java.util.Map;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class TestPartialUpdateStrategy {
+class TestPartialUpdateHandler {
   @Test
   void testEmptyProperties() {
     TypedProperties props = new TypedProperties();
-    Map<String, String> result = 
PartialUpdateStrategy.parseMergeProperties(props);
+    Map<String, String> result = 
PartialUpdateHandler.parseMergeProperties(props);
     assertTrue(result.isEmpty());
   }
 
   @Test
   void testDirectMatch() {
     Schema stringSchema = Schema.create(Schema.Type.STRING);
-    assertTrue(PartialUpdateStrategy.hasTargetType(stringSchema, 
Schema.Type.STRING));
+    assertTrue(PartialUpdateHandler.hasTargetType(stringSchema, 
Schema.Type.STRING));
   }
 
   @Test
@@ -50,7 +50,7 @@ class TestPartialUpdateStrategy {
         Schema.create(Schema.Type.BOOLEAN),
         Schema.create(Schema.Type.STRING)
     );
-    assertTrue(PartialUpdateStrategy.hasTargetType(unionSchema, 
Schema.Type.STRING));
+    assertTrue(PartialUpdateHandler.hasTargetType(unionSchema, 
Schema.Type.STRING));
   }
 
   @Test
@@ -60,12 +60,12 @@ class TestPartialUpdateStrategy {
         Schema.create(Schema.Type.BOOLEAN),
         Schema.create(Schema.Type.INT)
     );
-    assertFalse(PartialUpdateStrategy.hasTargetType(unionSchema, 
Schema.Type.STRING));
+    assertFalse(PartialUpdateHandler.hasTargetType(unionSchema, 
Schema.Type.STRING));
   }
 
   @Test
   void testNonUnionNonTargetType() {
     Schema intSchema = Schema.create(Schema.Type.INT);
-    assertFalse(PartialUpdateStrategy.hasTargetType(intSchema, 
Schema.Type.STRING));
+    assertFalse(PartialUpdateHandler.hasTargetType(intSchema, 
Schema.Type.STRING));
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
index a0c0b27ef537..579f6eaf0b49 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.table.read.HoodieReadStats;
@@ -125,7 +124,7 @@ public class BaseTestFileGroupRecordBuffer {
 
     if (fileGroupRecordBufferItrOpt.isEmpty()) {
       return new KeyBasedFileGroupRecordBuffer<>(
-          readerContext, mockMetaClient, recordMergeMode, 
PartialUpdateMode.NONE, props, orderingFieldNames, updateProcessor);
+          readerContext, mockMetaClient, recordMergeMode, Option.empty(), 
props, orderingFieldNames, updateProcessor);
     } else {
       FileGroupRecordBufferLoader recordBufferLoader = 
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
       InputSplit inputSplit = mock(InputSplit.class);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
index 67ace014bd61..330a57f19794 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.BufferedRecords;
 import org.apache.hudi.common.table.read.DeleteContext;
@@ -161,7 +160,7 @@ class TestFileGroupRecordBuffer {
             readerContext,
             hoodieTableMetaClient,
             RecordMergeMode.COMMIT_TIME_ORDERING,
-            PartialUpdateMode.NONE,
+            Option.empty(),
             props,
             Collections.emptyList(),
             updateProcessor
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
index 67b03db2c0b2..649b6ee1720c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
@@ -63,6 +63,7 @@ public class TestFileGroupRecordBufferLoader extends 
BaseTestFileGroupRecordBuff
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
     
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
     when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.NINE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.empty());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
index b4011a7205a5..4e85f74bbd37 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
@@ -146,7 +145,7 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     properties.setProperty(DELETE_MARKER, "3");
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
     
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
@@ -213,7 +212,7 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     properties.setProperty(DELETE_MARKER, "3");
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
     
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
@@ -247,7 +246,7 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
     HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
     KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new 
HoodieAvroRecordMerger(),
@@ -292,7 +291,7 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     
when(tableConfig.getPayloadClass()).thenReturn(TestKeyBasedFileGroupRecordBuffer.CustomPayload.class.getName());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
     
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
@@ -366,7 +365,7 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
     
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
index 9f3f6d8d44e1..50a8f1b43dd0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
@@ -22,7 +22,6 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.HoodieReadStats;
 import org.apache.hudi.common.table.read.UpdateProcessor;
@@ -90,7 +89,7 @@ class TestReusableKeyBasedRecordBuffer {
     
when(mockReaderContext.getRecordContext().seal(any())).thenAnswer(invocation -> 
invocation.getArgument(0));
 
     ReusableKeyBasedRecordBuffer<TestRecord> buffer = new 
ReusableKeyBasedRecordBuffer<>(mockReaderContext, metaClient,
-        RecordMergeMode.EVENT_TIME_ORDERING, PartialUpdateMode.NONE, new 
TypedProperties(), Collections.singletonList("value"), updateProcessor, 
preMergedLogRecords);
+        RecordMergeMode.EVENT_TIME_ORDERING, Option.empty(), new 
TypedProperties(), Collections.singletonList("value"), updateProcessor, 
preMergedLogRecords);
 
     List<TestRecord> baseFileRecords = Arrays.asList(new TestRecord("1", 10), 
new TestRecord("3", 30));
     
buffer.setBaseFileIterator(ClosableIterator.wrap(baseFileRecords.iterator()));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index a1ec3f66caca..1f2394db848e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -133,7 +133,7 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuf
     HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
     when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
     
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
 
     FileGroupRecordBufferLoader recordBufferLoader = 
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
     InputSplit inputSplit = mock(InputSplit.class);
@@ -199,12 +199,12 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuf
     
when(mockReaderContext.getRecordContext().seal(any())).thenAnswer(invocation -> 
invocation.getArgument(0));
     HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
     RecordMergeMode recordMergeMode = RecordMergeMode.COMMIT_TIME_ORDERING;
-    PartialUpdateMode partialUpdateMode = PartialUpdateMode.NONE;
+    Option<PartialUpdateMode> partialUpdateModeOpt = Option.empty();
     TypedProperties props = new TypedProperties();
     
when(mockReaderContext.getPayloadClasses(any())).thenReturn(Option.empty());
     UpdateProcessor<TestRecord> updateProcessor = 
UpdateProcessor.create(readStats, mockReaderContext, false, Option.empty(), 
props);
     return new SortedKeyBasedFileGroupRecordBuffer<>(
-        mockReaderContext, mockMetaClient, recordMergeMode, partialUpdateMode, 
props, Collections.emptyList(), updateProcessor);
+        mockReaderContext, mockMetaClient, recordMergeMode, 
partialUpdateModeOpt, props, Collections.emptyList(), updateProcessor);
   }
 
   private static <T> List<T> 
getActualRecordsForSortedKeyBased(SortedKeyBasedFileGroupRecordBuffer<T> 
fileGroupRecordBuffer) throws IOException {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
index 8c7e5d65a530..ee77ee1bf31c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.table.read.HoodieReadStats;
 import org.apache.hudi.common.util.Option;
@@ -76,7 +75,7 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecord
     properties.setProperty(DELETE_MARKER, "3");
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
     
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
@@ -113,7 +112,7 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecord
     properties.setProperty(DELETE_MARKER, "3");
     HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
     
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
@@ -152,7 +151,7 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecord
     
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
     
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
     StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
@@ -189,7 +188,7 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecord
     
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
     when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-    
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+    when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
     
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
     
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index f09fc348b101..a25856ca3bd8 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -170,7 +170,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
 
     assertTrue(storage.exists(cfgPath));
     assertFalse(storage.exists(backupCfgPath));
-    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null, 
null, null);
+    HoodieTableConfig config = new HoodieTableConfig(storage, metaPath);
     assertEquals(9, config.getProps().size());
     assertEquals("test-table2", config.getTableName());
     assertEquals(Collections.singletonList("new_field"), 
config.getPreCombineFields());
@@ -184,7 +184,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
     // delete a non existant property as well
     propsToDelete.add(HoodieTableConfig.RECORDKEY_FIELDS.key());
     HoodieTableConfig.updateAndDeleteProps(storage, metaPath, updatedProps, 
propsToDelete);
-    config = new HoodieTableConfig(storage, metaPath, null, null, null);
+    config = new HoodieTableConfig(storage, metaPath);
     assertEquals(8, config.getProps().size());
     assertEquals("test-table2", config.getTableName());
     assertEquals(Collections.singletonList("new_field2"), 
config.getPreCombineFields());
@@ -193,7 +193,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
     // just delete 1 property w/o updating anything.
     updatedProps = new Properties();
     HoodieTableConfig.updateAndDeleteProps(storage, metaPath, updatedProps, 
Collections.singleton(HoodieTableConfig.PRECOMBINE_FIELDS.key()));
-    config = new HoodieTableConfig(storage, metaPath, null, null, null);
+    config = new HoodieTableConfig(storage, metaPath);
     assertEquals(7, config.getProps().size());
     assertEquals("test-table2", config.getTableName());
     assertTrue(config.getPreCombineFields().isEmpty());
@@ -608,7 +608,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
         // Test case: Version 9 table with PostgresDebeziumAvroPayload (should 
set partial update mode and custom properties)
         arguments("Version 9 with PostgresDebeziumAvroPayload", null, 
PostgresDebeziumAvroPayload.class.getName(), null, "ts", 
HoodieTableVersion.NINE,
             5, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
PostgresDebeziumAvroPayload.class.getName(),
-            PartialUpdateMode.IGNORE_MARKERS.name(), 
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, null, null),
+            PartialUpdateMode.FILL_UNAVAILABLE.name(), 
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, null, null),
 
         // Test case: Version 9 table with AWSDmsAvroPayload (should set 
custom delete properties)
         arguments("Version 9 with AWSDmsAvroPayload", null, 
AWSDmsAvroPayload.class.getName(), null, null, HoodieTableVersion.NINE,
@@ -674,11 +674,11 @@ class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
       if (expectedDebeziumMarker != null) {
         assertEquals(expectedDebeziumMarker, configs.get(
-                HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
+                HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE),
             "Debezium marker mismatch for: " + testName);
       } else {
-        
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX 
+ HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
-            "Custom merge property " + 
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER + " not expected to be set");
+        
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX 
+ HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE),
+            "Custom merge property " + 
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE + " not expected to be set");
       }
 
       if (expectedDeleteKey != null) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index e232bf953e85..c0872b9f7fbe 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -89,12 +89,12 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
   } else {
     Option.empty.asInstanceOf[Option[String]]
   }
-  private lazy val partialUpdateMode: PartialUpdateMode = 
metaClient.getTableConfig.getPartialUpdateMode
+  private lazy val partialUpdateModeOpt: Option[PartialUpdateMode] = 
metaClient.getTableConfig.getPartialUpdateMode
   private var isPartialMergeEnabled = false
   private var bufferedRecordMerger = getBufferedRecordMerger
   private def getBufferedRecordMerger: BufferedRecordMerger[InternalRow] = 
BufferedRecordMergerFactory.create(readerContext,
     readerContext.getMergeMode, isPartialMergeEnabled, 
Option.of(recordMerger), orderingFieldNames,
-    payloadClass, avroSchema, props, partialUpdateMode)
+    payloadClass, avroSchema, props, partialUpdateModeOpt)
 
   private lazy val storage = metaClient.getStorage
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
index a5b2b996d9f6..cddfb1f6d4b4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
@@ -75,7 +75,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.BaseSparkInternalRecordContext.getFieldValueFromInternalRow;
 import static 
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
 import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
 import static org.apache.hudi.common.table.HoodieTableConfig.PRECOMBINE_FIELDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -124,9 +124,9 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
   @ParameterizedTest
   @MethodSource("mergeModeAndStageProvider")
   void testRegularMerging(RecordMergeMode mergeMode, PartialUpdateMode 
updateMode, MergeStage stage) throws IOException {
-    if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+    if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
       props.put(
-          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_UNAVAILABLE_VALUE,
           IGNORE_MARKERS_VALUE);
     }
 
@@ -140,7 +140,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
   }
 
   private void runDeltaMerge(RecordMergeMode mergeMode, PartialUpdateMode 
updateMode) throws IOException {
-    BufferedRecordMerger<InternalRow> merger = createMerger(readerContext, 
mergeMode, updateMode);
+    BufferedRecordMerger<InternalRow> merger = createMerger(readerContext, 
mergeMode, Option.of(updateMode));
     // Create records with all columns.
     InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old 
City", 1000L);
     InternalRow newRecord = createFullRecord("new_id", "New Name", 0, 
IGNORE_MARKERS_VALUE, 0L);
@@ -153,16 +153,16 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
     Option<BufferedRecord<InternalRow>> deltaResult = 
merger.deltaMerge(newBufferedRecord, oldBufferedRecord);
     if (mergeMode == COMMIT_TIME_ORDERING) {
       assertTrue(deltaResult.isPresent());
-      if (updateMode == PartialUpdateMode.NONE) {
+      if (updateMode == null) {
         assertEquals(newRecord, deltaResult.get().getRecord());
       } else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
         assertEquals(25, deltaResult.get().getRecord().getInt(2));
         assertEquals(1000L, deltaResult.get().getRecord().getLong(4));
-      } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+      } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
         assertEquals("Old City", deltaResult.get().getRecord().getString(3));
       }
     } else {
-      if (updateMode == PartialUpdateMode.NONE) {
+      if (updateMode == null) {
         assertTrue(deltaResult.isEmpty());
       } else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
         assertTrue(deltaResult.isPresent());
@@ -175,12 +175,12 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
     newBufferedRecord = new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE + 1, 
newRecord, 1, null);
     deltaResult = merger.deltaMerge(newBufferedRecord, oldBufferedRecord);
     assertTrue(deltaResult.isPresent());
-    if (updateMode == PartialUpdateMode.NONE) {
+    if (updateMode == null) {
       assertEquals(newRecord, deltaResult.get().getRecord());
     } else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
       assertEquals(25, deltaResult.get().getRecord().getInt(2));
       assertEquals(1000L, deltaResult.get().getRecord().getLong(4));
-    } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+    } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
       assertEquals("Old City", deltaResult.get().getRecord().getString(3));
     }
 
@@ -189,18 +189,18 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
     newBufferedRecord = new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE, 
newRecord, 1, null);
     deltaResult = merger.deltaMerge(newBufferedRecord, oldBufferedRecord);
     assertTrue(deltaResult.isPresent());
-    if (updateMode == PartialUpdateMode.NONE) {
+    if (updateMode == null) {
       assertEquals(newRecord, deltaResult.get().getRecord());
     } else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
       assertEquals(25, deltaResult.get().getRecord().getInt(2));
       assertEquals(1000L, deltaResult.get().getRecord().getLong(4));
-    } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+    } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
       assertEquals("Old City", deltaResult.get().getRecord().getString(3));
     }
   }
 
   private void runDeltaDeleteMerge(RecordMergeMode mergeMode, 
PartialUpdateMode updateMode) throws IOException {
-    BufferedRecordMerger<InternalRow> merger = createMerger(readerContext, 
mergeMode, updateMode);
+    BufferedRecordMerger<InternalRow> merger = createMerger(readerContext, 
mergeMode, Option.ofNullable(updateMode));
     // Create records with all columns.
     InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old 
City", 1000L);
     InternalRow newRecord = createFullRecord("new_id", "New Name", 0, 
IGNORE_MARKERS_VALUE, 0L);
@@ -231,7 +231,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
   }
 
   private void runFinalMerge(RecordMergeMode mergeMode, PartialUpdateMode 
updateMode) throws IOException {
-    BufferedRecordMerger<InternalRow> merger = createMerger(readerContext, 
mergeMode, updateMode);
+    BufferedRecordMerger<InternalRow> merger = createMerger(readerContext, 
mergeMode, Option.ofNullable(updateMode));
     InternalRow oldRecord = createFullRecord(
         "older_id", "Older Name", 20, "Older City", 500L);
     InternalRow newRecord = createFullRecord(
@@ -245,12 +245,12 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
     BufferedRecord<InternalRow> finalResult = 
merger.finalMerge(olderBufferedRecord, newerBufferedRecord);
     assertFalse(finalResult.isDelete());
     if (mergeMode == COMMIT_TIME_ORDERING) {
-      if (updateMode == PartialUpdateMode.NONE) {
+      if (updateMode == null) {
         assertEquals(newRecord, finalResult.getRecord());
       } else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
         assertEquals(20, finalResult.getRecord().getInt(2));
         assertEquals(500L, finalResult.getRecord().getLong(4));
-      } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+      } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
         assertEquals("Older City", finalResult.getRecord().getString(3));
       }
     } else {
@@ -262,12 +262,12 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
     newerBufferedRecord = new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE + 1, 
newRecord, 1, null);
     finalResult = merger.finalMerge(olderBufferedRecord, newerBufferedRecord);
     assertFalse(finalResult.isDelete());
-    if (updateMode == PartialUpdateMode.NONE) {
+    if (updateMode == null) {
       assertEquals(newRecord, finalResult.getRecord());
     } else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
       assertEquals(20, finalResult.getRecord().getInt(2));
       assertEquals(500, finalResult.getRecord().getLong(4));
-    } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+    } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
       assertEquals("Older City", finalResult.getRecord().getString(3));
     }
 
@@ -276,12 +276,12 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
     newerBufferedRecord = new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE, 
newRecord, 1, null);
     finalResult = merger.finalMerge(olderBufferedRecord, newerBufferedRecord);
     assertFalse(finalResult.isDelete());
-    if (updateMode == PartialUpdateMode.NONE) {
+    if (updateMode == null) {
       assertEquals(newRecord, finalResult.getRecord());
     } else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
       assertEquals(20, finalResult.getRecord().getInt(2));
       assertEquals(500, finalResult.getRecord().getLong(4));
-    } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+    } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
       assertEquals("Older City", finalResult.getRecord().getString(3));
     }
   }
@@ -381,7 +381,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
   @EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING", 
"EVENT_TIME_ORDERING"})
   void testRegularMergingWithIgnoreDefaultsNested(RecordMergeMode mergeMode) 
throws IOException {
     BufferedRecordMerger<InternalRow> ignoreDefaultsMerger =
-        createMerger(readerContext, mergeMode, 
PartialUpdateMode.IGNORE_DEFAULTS);
+        createMerger(readerContext, mergeMode, 
Option.of(PartialUpdateMode.IGNORE_DEFAULTS));
 
     // Old record has all columns, new record has some columns with 
default/null values
     InternalRow oldRecordWithDefaults = createFullRecordWithCompany(
@@ -412,7 +412,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
   @EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING", 
"EVENT_TIME_ORDERING"})
   void testDeltaMergeWithNullExistingRecord(RecordMergeMode mergeMode) throws 
IOException {
     BufferedRecordMerger<InternalRow> merger = createMerger(
-        readerContext, mergeMode, PartialUpdateMode.NONE);
+        readerContext, mergeMode, Option.empty());
 
     // New record is not delete.
     InternalRow newRecord = createFullRecord(
@@ -449,7 +449,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
   @ParameterizedTest
   @EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING", 
"EVENT_TIME_ORDERING"})
   void testDeltaMergeWithDeleteRecord(RecordMergeMode mergeMode) {
-    BufferedRecordMerger<InternalRow> merger = createMerger(readerContext, 
mergeMode, PartialUpdateMode.NONE);
+    BufferedRecordMerger<InternalRow> merger = createMerger(readerContext, 
mergeMode, Option.empty());
 
     // Delete record has null ordering value.
     InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old 
City", 1000L);
@@ -553,7 +553,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
   @EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING", 
"EVENT_TIME_ORDERING"})
   void testFinalMergeWithDeleteRecords(RecordMergeMode mergeMode) throws 
IOException {
     BufferedRecordMerger<InternalRow> merger = createMerger(
-        readerContext, mergeMode, PartialUpdateMode.NONE);
+        readerContext, mergeMode, Option.empty());
 
     InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old 
City", 1000L);
     InternalRow newRecord = createFullRecord("new_id", "New Name", 29, "New 
City", 2000L);
@@ -608,7 +608,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
         )
         .filter(args -> {
           PartialUpdateMode updateMode = (PartialUpdateMode) args.get()[1];
-          return updateMode != PartialUpdateMode.FILL_DEFAULTS && updateMode 
!= PartialUpdateMode.KEEP_VALUES;
+          return updateMode != PartialUpdateMode.IGNORE_DEFAULTS;
         });
   }
 
@@ -661,7 +661,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
 
   private BufferedRecordMerger<InternalRow> 
createMerger(HoodieReaderContext<InternalRow> readerContext,
                                                          RecordMergeMode 
mergeMode,
-                                                         PartialUpdateMode 
partialUpdateMode) {
+                                                         
Option<PartialUpdateMode> partialUpdateModeOpt) {
     return BufferedRecordMergerFactory.create(
         readerContext,
         mergeMode,
@@ -673,7 +673,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
         Option.empty(), // payloadClass
         READER_SCHEMA, // readerSchema
         props, // props
-        partialUpdateMode
+        partialUpdateModeOpt
     );
   }
 
@@ -690,7 +690,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
         Option.empty(), // payloadClass
         READER_SCHEMA, // readerSchema
         props, // props
-        PartialUpdateMode.KEEP_VALUES // partialUpdateMode
+        Option.of(PartialUpdateMode.IGNORE_DEFAULTS) // partialUpdateMode
     );
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
index b6ac8ad23f84..283964625047 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
@@ -25,13 +25,13 @@ import org.apache.hudi.common.model.{AWSDmsAvroPayload, 
EventTimeAvroPayload, Ho
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY, 
DELETE_MARKER}
 import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, PartialUpdateMode}
-import 
org.apache.hudi.common.table.HoodieTableConfig.{DEBEZIUM_UNAVAILABLE_VALUE, 
PARTIAL_UPDATE_CUSTOM_MARKER, RECORD_MERGE_PROPERTY_PREFIX}
+import 
org.apache.hudi.common.table.HoodieTableConfig.{DEBEZIUM_UNAVAILABLE_VALUE, 
PARTIAL_UPDATE_UNAVAILABLE_VALUE, RECORD_MERGE_PROPERTY_PREFIX}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
 
 import org.apache.spark.sql.SaveMode
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
@@ -112,8 +112,8 @@ class TestEightToNineUpgrade extends 
RecordLevelIndexTestBase {
     assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
     // The payload class should be maintained.
     assertEquals(payloadClass, metaClient.getTableConfig.getPayloadClass)
-    // The partial update mode should be NONE.
-    assertEquals(PartialUpdateMode.NONE, 
metaClient.getTableConfig.getPartialUpdateMode)
+    // The partial update mode should not be present
+    assertTrue(metaClient.getTableConfig.getPartialUpdateMode.isEmpty)
     if 
(payloadClass.equals("org.apache.hudi.common.model.EventTimeAvroPayload")) {
       assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
metaClient.getTableConfig.getRecordMergeStrategyId)
     } else {
@@ -135,30 +135,33 @@ class TestEightToNineUpgrade extends 
RecordLevelIndexTestBase {
         HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
         metaClient.getTableConfig.getRecordMergeStrategyId)
       assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, 
metaClient.getTableConfig.getRecordMergeMode)
-      assertEquals(PartialUpdateMode.IGNORE_DEFAULTS, 
metaClient.getTableConfig.getPartialUpdateMode)
+      assertEquals(PartialUpdateMode.IGNORE_DEFAULTS, 
metaClient.getTableConfig.getPartialUpdateMode.get())
     } else if 
(payloadClass.equals(classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName))
 {
       assertEquals(
         HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
         metaClient.getTableConfig.getRecordMergeStrategyId)
       assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING, 
metaClient.getTableConfig.getRecordMergeMode)
-      assertEquals(PartialUpdateMode.IGNORE_DEFAULTS, 
metaClient.getTableConfig.getPartialUpdateMode)
+      assertEquals(PartialUpdateMode.IGNORE_DEFAULTS, 
metaClient.getTableConfig.getPartialUpdateMode.get())
     } else if 
(payloadClass.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
       assertEquals(
         HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
         metaClient.getTableConfig.getRecordMergeStrategyId)
       assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, 
metaClient.getTableConfig.getRecordMergeMode)
-      assertEquals(PartialUpdateMode.IGNORE_MARKERS, 
metaClient.getTableConfig.getPartialUpdateMode)
-      val customMarker = 
metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${PARTIAL_UPDATE_CUSTOM_MARKER}")
+      assertEquals(PartialUpdateMode.FILL_UNAVAILABLE, 
metaClient.getTableConfig.getPartialUpdateMode.get())
+      val customMarker = 
metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${PARTIAL_UPDATE_UNAVAILABLE_VALUE}")
       assertEquals(DEBEZIUM_UNAVAILABLE_VALUE, customMarker)
     } else if (payloadClass.equals(classOf[AWSDmsAvroPayload].getName)) {
       assertEquals(
         HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
         metaClient.getTableConfig.getRecordMergeStrategyId)
       assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING, 
metaClient.getTableConfig.getRecordMergeMode)
+      assertTrue(metaClient.getTableConfig.getPartialUpdateMode.isEmpty)
       val deleteField = 
metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${DELETE_KEY}")
       assertEquals(AWSDmsAvroPayload.OP_FIELD, deleteField)
       val deleteMarker = 
metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${DELETE_MARKER}")
       assertEquals(AWSDmsAvroPayload.DELETE_OPERATION_VALUE, deleteMarker)
+    } else {
+      assertTrue(metaClient.getTableConfig.getPartialUpdateMode.isEmpty)
     }
   }
 }
@@ -172,7 +175,7 @@ object TestEightToNineUpgrade {
       Arguments.of("COPY_ON_WRITE", 
classOf[PostgresDebeziumAvroPayload].getName),
       Arguments.of("COPY_ON_WRITE", classOf[AWSDmsAvroPayload].getName),
       Arguments.of("MERGE_ON_READ", classOf[EventTimeAvroPayload].getName),
-      Arguments.of("COPY_ON_WRITE", classOf[PartialUpdateAvroPayload].getName),
+      Arguments.of("MERGE_ON_READ", classOf[PartialUpdateAvroPayload].getName),
       Arguments.of("MERGE_ON_READ", 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName),
       Arguments.of("MERGE_ON_READ", 
classOf[PostgresDebeziumAvroPayload].getName),
       Arguments.of("MERGE_ON_READ", classOf[AWSDmsAvroPayload].getName)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index 1c98a849bb29..b99c1f4159d6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -260,7 +260,7 @@ object TestPayloadDeprecationFlow {
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PostgresDebeziumAvroPayload].getName,
           HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
-        HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER
+        HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
           -> "__debezium_unavailable_value"),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -322,7 +322,7 @@ object TestPayloadDeprecationFlow {
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PostgresDebeziumAvroPayload].getName,
           HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
           HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
-          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
             -> "__debezium_unavailable_value"),
       Arguments.of(
         "MERGE_ON_READ",

Reply via email to