This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0a06b8ebb603 [HUDI-8401] Addressing some follow up feedback on PR
13519 (#13721)
0a06b8ebb603 is described below
commit 0a06b8ebb6032b82be75a16572813c43d073c35d
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Aug 15 22:10:28 2025 -0700
[HUDI-8401] Addressing some follow up feedback on PR 13519 (#13721)
---
.../table/upgrade/EightToNineUpgradeHandler.java | 11 ++---
.../table/upgrade/NineToEightDowngradeHandler.java | 4 +-
.../upgrade/TestEightToNineUpgradeHandler.java | 28 ++++++-----
.../upgrade/TestNineToEightDowngradeHandler.java | 4 +-
.../hudi/common/table/HoodieTableConfig.java | 23 +++++----
.../hudi/common/table/PartialUpdateMode.java | 19 ++------
.../table/read/BufferedRecordMergerFactory.java | 36 +++++++--------
...dateStrategy.java => PartialUpdateHandler.java} | 20 ++++----
.../buffer/DefaultFileGroupRecordBufferLoader.java | 10 ++--
.../table/read/buffer/FileGroupRecordBuffer.java | 8 ++--
.../read/buffer/KeyBasedFileGroupRecordBuffer.java | 6 +--
.../buffer/PositionBasedFileGroupRecordBuffer.java | 6 +--
.../ReusableFileGroupRecordBufferLoader.java | 6 +--
.../read/buffer/ReusableKeyBasedRecordBuffer.java | 4 +-
.../SortedKeyBasedFileGroupRecordBuffer.java | 4 +-
.../StreamingFileGroupRecordBufferLoader.java | 6 +--
.../read/buffer/UnmergedFileGroupRecordBuffer.java | 4 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 3 +-
...Strategy.java => TestPartialUpdateHandler.java} | 12 ++---
.../read/buffer/BaseTestFileGroupRecordBuffer.java | 3 +-
.../read/buffer/TestFileGroupRecordBuffer.java | 3 +-
.../buffer/TestFileGroupRecordBufferLoader.java | 1 +
.../buffer/TestKeyBasedFileGroupRecordBuffer.java | 11 ++---
.../buffer/TestReusableKeyBasedRecordBuffer.java | 3 +-
.../TestSortedKeyBasedFileGroupRecordBuffer.java | 6 +--
...TestStreamingKeyBasedFileGroupRecordBuffer.java | 9 ++--
.../hudi/common/table/TestHoodieTableConfig.java | 14 +++---
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 4 +-
.../hudi/functional/TestBufferedRecordMerger.java | 54 +++++++++++-----------
.../hudi/functional/TestEightToNineUpgrade.scala | 21 +++++----
.../functional/TestPayloadDeprecationFlow.scala | 4 +-
31 files changed, 165 insertions(+), 182 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
index 5d3c54d69d9b..ac4609983427 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
@@ -52,7 +52,7 @@ import static
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_M
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
-import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
@@ -160,9 +160,6 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String>
tablePropsToAdd,
HoodieTableConfig tableConfig)
{
- // Set partial update mode for all tables.
- tablePropsToAdd.put(PARTIAL_UPDATE_MODE, PartialUpdateMode.NONE.name());
// to be fixed once we land PR #13721
-
String payloadClass = tableConfig.getPayloadClass();
String mergeStrategy = tableConfig.getRecordMergeStrategyId();
if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) ||
StringUtils.isNullOrEmpty(payloadClass)) {
@@ -172,9 +169,7 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
|| payloadClass.equals(PartialUpdateAvroPayload.class.getName())) {
tablePropsToAdd.put(PARTIAL_UPDATE_MODE,
PartialUpdateMode.IGNORE_DEFAULTS.name());
} else if
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
- tablePropsToAdd.put(PARTIAL_UPDATE_MODE,
PartialUpdateMode.IGNORE_MARKERS.name()); // to be fixed once we land PR #13721.
- } else {
- tablePropsToAdd.put(PARTIAL_UPDATE_MODE, PartialUpdateMode.NONE.name());
// to be fixed once we land PR #13721.
+ tablePropsToAdd.put(PARTIAL_UPDATE_MODE,
PartialUpdateMode.FILL_UNAVAILABLE.name());
}
}
@@ -194,7 +189,7 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
AWSDmsAvroPayload.DELETE_OPERATION_VALUE);
} else if
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
tablePropsToAdd.put(
- ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER).noDefaultValue(), // // to be fixed once we land
PR #13721.
+ ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue(), // // to be fixed once we
land PR #13721.
DEBEZIUM_UNAVAILABLE_VALUE);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
index af927926785c..730cdc84d400 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
@@ -42,7 +42,7 @@ import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
-import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
@@ -108,7 +108,7 @@ public class NineToEightDowngradeHandler implements
DowngradeHandler {
}
if
(legacyPayloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
propertiesToRemove.add(
- ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER).noDefaultValue());
+ ConfigProperty.key(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_UNAVAILABLE_VALUE).noDefaultValue());
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
index ed6c8edd55e1..424273082413 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
@@ -69,12 +69,12 @@ import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
-import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
-import static org.apache.hudi.common.table.PartialUpdateMode.IGNORE_MARKERS;
+import static org.apache.hudi.common.table.PartialUpdateMode.FILL_UNAVAILABLE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -98,7 +98,7 @@ class TestEightToNineUpgradeHandler {
private final SupportsUpgradeDowngrade upgradeDowngradeHelper =
mock(SupportsUpgradeDowngrade.class);
private final HoodieWriteConfig config = mock(HoodieWriteConfig.class);
- private static final Map<ConfigProperty, String> DEFAULT_CONFIG_UPDATED =
Collections.singletonMap(PARTIAL_UPDATE_MODE, PartialUpdateMode.NONE.name());
+ private static final Map<ConfigProperty, String> DEFAULT_CONFIG_UPDATED =
Collections.emptyMap();
private static final Set<ConfigProperty> DEFAULT_CONFIG_REMOVED =
Collections.emptySet();
private static final UpgradeDowngrade.TableConfigChangeSet
DEFAULT_UPGRADE_RESULT =
new UpgradeDowngrade.TableConfigChangeSet(DEFAULT_CONFIG_UPDATED,
DEFAULT_CONFIG_REMOVED);
@@ -143,21 +143,21 @@ class TestEightToNineUpgradeHandler {
DefaultHoodieRecordPayload.class.getName(),
"",
null,
- PartialUpdateMode.NONE.name(),
+ null,
"DefaultHoodieRecordPayload"
),
Arguments.of(
EventTimeAvroPayload.class.getName(),
"",
EVENT_TIME_ORDERING.name(),
- PartialUpdateMode.NONE.name(),
+ null,
"EventTimeAvroPayload"
),
Arguments.of(
OverwriteWithLatestAvroPayload.class.getName(),
"",
null,
- PartialUpdateMode.NONE.name(),
+ null,
"OverwriteWithLatestAvroPayload"
),
Arguments.of(
@@ -165,15 +165,15 @@ class TestEightToNineUpgradeHandler {
RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op,"
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", //
mergeProperties
COMMIT_TIME_ORDERING.name(),
- PartialUpdateMode.NONE.name(),
+ null,
"AWSDmsAvroPayload"
),
Arguments.of(
PostgresDebeziumAvroPayload.class.getName(),
- RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_CUSTOM_MARKER
+ RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE
+ "=" + DEBEZIUM_UNAVAILABLE_VALUE,
EVENT_TIME_ORDERING.name(),
- IGNORE_MARKERS.name(),
+ FILL_UNAVAILABLE.name(),
"PostgresDebeziumAvroPayload"
),
Arguments.of(
@@ -187,7 +187,7 @@ class TestEightToNineUpgradeHandler {
MySqlDebeziumAvroPayload.class.getName(),
"",
EVENT_TIME_ORDERING.name(),
- PartialUpdateMode.NONE.name(),
+ null,
"MySqlDebeziumAvroPayload"
),
Arguments.of(
@@ -241,8 +241,12 @@ class TestEightToNineUpgradeHandler {
assertEquals(expectedRecordMergeMode,
propertiesToAdd.get(RECORD_MERGE_MODE));
}
// Assert partial update mode
- assertTrue(propertiesToAdd.containsKey(PARTIAL_UPDATE_MODE));
- assertEquals(expectedPartialUpdateMode,
propertiesToAdd.get(PARTIAL_UPDATE_MODE));
+ if (expectedPartialUpdateMode != null) {
+ assertTrue(propertiesToAdd.containsKey(PARTIAL_UPDATE_MODE));
+ assertEquals(expectedPartialUpdateMode,
propertiesToAdd.get(PARTIAL_UPDATE_MODE));
+ } else {
+ assertFalse(propertiesToAdd.containsKey(PARTIAL_UPDATE_MODE));
+ }
// Assert payload class change
assertPayloadClassChange(propertiesToAdd, propertiesToRemove,
payloadClassName);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
index 6713f297afd6..0e37b9c8a717 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
@@ -63,8 +63,8 @@ import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
-import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
@@ -146,7 +146,7 @@ class TestNineToEightDowngradeHandler {
Arguments.of(
PostgresDebeziumAvroPayload.class.getName(),
3,
- LEGACY_PAYLOAD_CLASS_NAME.key() + "," + PARTIAL_UPDATE_MODE.key()
+ "," + RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_CUSTOM_MARKER,
+ LEGACY_PAYLOAD_CLASS_NAME.key() + "," + PARTIAL_UPDATE_MODE.key()
+ "," + RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE,
3,
true,
true,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 693eac723318..f6463e4280ec 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -130,7 +130,7 @@ public class HoodieTableConfig extends HoodieConfig {
public static final String HOODIE_PROPERTIES_FILE_BACKUP =
"hoodie.properties.backup";
public static final String HOODIE_WRITE_TABLE_NAME_KEY =
"hoodie.datasource.write.table.name";
public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";
- public static final String PARTIAL_UPDATE_CUSTOM_MARKER =
"hoodie.write.partial.update.custom.marker";
+ public static final String PARTIAL_UPDATE_UNAVAILABLE_VALUE =
"hoodie.write.partial.update.unavailable.value";
public static final String DEBEZIUM_UNAVAILABLE_VALUE =
"__debezium_unavailable_value";
// This prefix is used to set merging related properties.
// A reader might need to read some writer properties to function as
expected,
@@ -363,12 +363,11 @@ public class HoodieTableConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("When set to true, the table can support reading and
writing multiple base file formats.");
- public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE =
ConfigProperty
+ public static final ConfigProperty<String> PARTIAL_UPDATE_MODE =
ConfigProperty
.key("hoodie.table.partial.update.mode")
- .defaultValue(PartialUpdateMode.NONE)
+ .noDefaultValue()
.sinceVersion("1.1.0")
- .withDocumentation("This property when set, will define how two versions
of the record will be "
- + "merged together where the later contains only partial set of
values and not entire record.");
+ .withDocumentation("This property when set, will define how two versions
of the record will be merged together when records are partially formed");
public static final ConfigProperty<String> URL_ENCODE_PARTITIONING =
KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE =
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
@@ -871,13 +870,13 @@ public class HoodieTableConfig extends HoodieConfig {
||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
} else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
- reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_MARKERS.name());
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.FILL_UNAVAILABLE.name());
}
// Additional custom merge properties.
// Cretain payloads are migrated to non payload way from 1.1 Hudi
binary and the reader might need certain properties for the
// merge to function as expected. Handing such special cases here.
if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
- reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_UNAVAILABLE_VALUE, DEBEZIUM_UNAVAILABLE_VALUE);
} else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName()))
{
reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY,
OP_FIELD);
reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER,
DELETE_OPERATION_VALUE);
@@ -1196,12 +1195,16 @@ public class HoodieTableConfig extends HoodieConfig {
CONFIG_VALUES_DELIMITER));
}
- public PartialUpdateMode getPartialUpdateMode() {
+ public Option<PartialUpdateMode> getPartialUpdateMode() {
if (getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE)) {
- return
PartialUpdateMode.valueOf(getStringOrDefault(PARTIAL_UPDATE_MODE));
+ if (contains(PARTIAL_UPDATE_MODE)) {
+ return
Option.of(PartialUpdateMode.valueOf(getString(PARTIAL_UPDATE_MODE)));
+ } else {
+ return Option.empty();
+ }
} else {
// For table version <= 8, partial update is not supported.
- return PartialUpdateMode.NONE;
+ return Option.empty();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java
index d5622104e955..c0701f0459f5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java
@@ -22,18 +22,6 @@ package org.apache.hudi.common.table;
import org.apache.hudi.common.config.EnumFieldDescription;
public enum PartialUpdateMode {
- @EnumFieldDescription(
- "No partial update logic should be employed.")
- NONE,
-
- @EnumFieldDescription(
- "For any column values missing in current record, pick value from
previous version of the record.")
- KEEP_VALUES,
-
- @EnumFieldDescription(
- "For column values missing in current record, pick the default value
from the schema.")
- FILL_DEFAULTS,
-
@EnumFieldDescription(
"For columns having default values set in current record, pick the value
from previous version of the record."
+ "Only top level data type default is checked, which means this mode
does not check leaf level data type default"
@@ -41,8 +29,7 @@ public enum PartialUpdateMode {
IGNORE_DEFAULTS,
@EnumFieldDescription(
- "For columns having marker in the current record, pick value from
previous version of the record during write."
- + "Marker value can be defined using
`hoodie.write.partial.update.custom.marker`, which should be added to"
- + "the value of table config `hoodie.write.partial.update.properties`.")
- IGNORE_MARKERS
+ "For columns having unavailable values in the current record, pick value
from previous version of the record during write. "
+ + "Unavailable value can be defined using
`hoodie.write.partial.update.unavailable.value` in the table property.")
+ FILL_UNAVAILABLE
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 8a12a13ec9cc..7ccc4c81d02d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -57,9 +57,9 @@ public class BufferedRecordMergerFactory {
Option<String> payloadClass,
Schema readerSchema,
TypedProperties props,
- PartialUpdateMode
partialUpdateMode) {
+ Option<PartialUpdateMode>
partialUpdateModeOpt) {
return create(readerContext, recordMergeMode, enablePartialMerging,
recordMerger,
- orderingFieldNames, readerSchema, payloadClass.map(p -> Pair.of(p,
p)), props, partialUpdateMode);
+ orderingFieldNames, readerSchema, payloadClass.map(p -> Pair.of(p,
p)), props, partialUpdateModeOpt);
}
public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T>
readerContext,
@@ -70,7 +70,7 @@ public class BufferedRecordMergerFactory {
Schema readerSchema,
Option<Pair<String,
String>> payloadClasses,
TypedProperties props,
- PartialUpdateMode
partialUpdateMode) {
+ Option<PartialUpdateMode>
partialUpdateModeOpt) {
/**
* This part implements KEEP_VALUES partial update mode, which merges two
records that do not have all columns.
* Other Partial update modes, like IGNORE_DEFAULTS assume all columns
exists in the record,
@@ -78,21 +78,21 @@ public class BufferedRecordMergerFactory {
*/
if (enablePartialMerging) {
BufferedRecordMerger<T> deleteRecordMerger = create(
- readerContext, recordMergeMode, false, recordMerger,
orderingFieldNames, readerSchema, payloadClasses, props, partialUpdateMode);
+ readerContext, recordMergeMode, false, recordMerger,
orderingFieldNames, readerSchema, payloadClasses, props, Option.empty());
return new
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(),
recordMerger, deleteRecordMerger, orderingFieldNames, readerSchema, props);
}
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- if (partialUpdateMode == PartialUpdateMode.NONE) {
+ if (partialUpdateModeOpt.isEmpty()) {
return new CommitTimeRecordMerger<>();
}
- return new
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(),
partialUpdateMode, props);
+ return new
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(),
partialUpdateModeOpt.get(), props);
case EVENT_TIME_ORDERING:
- if (partialUpdateMode == PartialUpdateMode.NONE) {
+ if (partialUpdateModeOpt.isEmpty()) {
return new EventTimeRecordMerger<>();
}
- return new
EventTimePartialRecordMerger<>(readerContext.getRecordContext(),
partialUpdateMode, props);
+ return new
EventTimePartialRecordMerger<>(readerContext.getRecordContext(),
partialUpdateModeOpt.get(), props);
default:
if (payloadClasses.isPresent()) {
if
(payloadClasses.get().getRight().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
{
@@ -133,14 +133,14 @@ public class BufferedRecordMergerFactory {
* based on {@code COMMIT_TIME_ORDERING} merge mode and partial update mode.
*/
private static class CommitTimePartialRecordMerger<T> extends
CommitTimeRecordMerger<T> {
- private final PartialUpdateStrategy<T> partialUpdateStrategy;
+ private final PartialUpdateHandler<T> partialUpdateHandler;
private final RecordContext<T> recordContext;
public CommitTimePartialRecordMerger(RecordContext<T> recordContext,
PartialUpdateMode partialUpdateMode,
TypedProperties props) {
super();
- this.partialUpdateStrategy = new PartialUpdateStrategy<>(recordContext,
partialUpdateMode, props);
+ this.partialUpdateHandler = new PartialUpdateHandler<>(recordContext,
partialUpdateMode, props);
this.recordContext = recordContext;
}
@@ -148,7 +148,7 @@ public class BufferedRecordMergerFactory {
public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord,
BufferedRecord<T>
existingRecord) {
if (existingRecord != null) {
- newRecord = partialUpdateStrategy.partialMerge(
+ newRecord = partialUpdateHandler.partialMerge(
newRecord,
existingRecord,
recordContext.getSchemaFromBufferRecord(newRecord),
@@ -161,7 +161,7 @@ public class BufferedRecordMergerFactory {
@Override
public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) {
- newerRecord = partialUpdateStrategy.partialMerge(
+ newerRecord = partialUpdateHandler.partialMerge(
newerRecord,
olderRecord,
recordContext.getSchemaFromBufferRecord(newerRecord),
@@ -203,13 +203,13 @@ public class BufferedRecordMergerFactory {
* based on {@code EVENT_TIME_ORDERING} merge mode and partial update mode.
*/
private static class EventTimePartialRecordMerger<T> extends
EventTimeRecordMerger<T> {
- private final PartialUpdateStrategy<T> partialUpdateStrategy;
+ private final PartialUpdateHandler<T> partialUpdateHandler;
private final RecordContext<T> recordContext;
public EventTimePartialRecordMerger(RecordContext<T> recordContext,
PartialUpdateMode partialUpdateMode,
TypedProperties props) {
- this.partialUpdateStrategy = new PartialUpdateStrategy<>(recordContext,
partialUpdateMode, props);
+ this.partialUpdateHandler = new PartialUpdateHandler<>(recordContext,
partialUpdateMode, props);
this.recordContext = recordContext;
}
@@ -218,7 +218,7 @@ public class BufferedRecordMergerFactory {
if (existingRecord == null) {
return Option.of(newRecord);
} else if (shouldKeepNewerRecord(existingRecord, newRecord)) {
- newRecord = partialUpdateStrategy.partialMerge(
+ newRecord = partialUpdateHandler.partialMerge(
newRecord,
existingRecord,
recordContext.getSchemaFromBufferRecord(newRecord),
@@ -227,7 +227,7 @@ public class BufferedRecordMergerFactory {
return Option.of(newRecord);
} else {
// Use existing record as the base record since existing record has
higher ordering value.
- existingRecord = partialUpdateStrategy.partialMerge(
+ existingRecord = partialUpdateHandler.partialMerge(
existingRecord,
newRecord,
recordContext.getSchemaFromBufferRecord(existingRecord),
@@ -248,7 +248,7 @@ public class BufferedRecordMergerFactory {
if (!olderRecord.isCommitTimeOrderingDelete()
&& oldOrderingValue.compareTo(newOrderingValue) > 0) {
// Use old record as the base record since old record has higher
ordering value.
- olderRecord = partialUpdateStrategy.partialMerge(
+ olderRecord = partialUpdateHandler.partialMerge(
olderRecord,
newerRecord,
recordContext.getSchemaFromBufferRecord(olderRecord),
@@ -257,7 +257,7 @@ public class BufferedRecordMergerFactory {
return olderRecord;
}
- newerRecord = partialUpdateStrategy.partialMerge(
+ newerRecord = partialUpdateHandler.partialMerge(
newerRecord,
olderRecord,
recordContext.getSchemaFromBufferRecord(newerRecord),
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
similarity index 93%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
index 3a925cba9c58..c5ddedc1d714 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
@@ -32,7 +32,7 @@ import java.util.Map;
import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDefaultValue;
import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
-import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
import static org.apache.hudi.common.util.ConfigUtils.extractWithPrefix;
@@ -42,14 +42,14 @@ import static
org.apache.hudi.common.util.ConfigUtils.extractWithPrefix;
* {@link BufferedRecordMergerFactory.CommitTimePartialRecordMerger} and
* {@link BufferedRecordMergerFactory.EventTimePartialRecordMerger}.
*/
-public class PartialUpdateStrategy<T> implements Serializable {
+public class PartialUpdateHandler<T> implements Serializable {
private final RecordContext<T> recordContext;
private final PartialUpdateMode partialUpdateMode;
private final Map<String, String> mergeProperties;
- public PartialUpdateStrategy(RecordContext<T> recordContext,
- PartialUpdateMode partialUpdateMode,
- TypedProperties props) {
+ public PartialUpdateHandler(RecordContext<T> recordContext,
+ PartialUpdateMode partialUpdateMode,
+ TypedProperties props) {
this.recordContext = recordContext;
this.partialUpdateMode = partialUpdateMode;
this.mergeProperties = parseMergeProperties(props);
@@ -67,21 +67,17 @@ public class PartialUpdateStrategy<T> implements
Serializable {
boolean keepOldMetadataColumns) {
// Note that: When either newRecord or oldRecord is a delete record,
// skip partial update since delete records do not have
meaningful columns.
- if (partialUpdateMode == PartialUpdateMode.NONE
- || null == oldRecord
+ if (null == oldRecord
|| newRecord.isDelete()
|| oldRecord.isDelete()) {
return newRecord;
}
switch (partialUpdateMode) {
- case KEEP_VALUES:
- case FILL_DEFAULTS:
- return newRecord;
case IGNORE_DEFAULTS:
return reconcileDefaultValues(
newRecord, oldRecord, newSchema, oldSchema,
keepOldMetadataColumns);
- case IGNORE_MARKERS:
+ case FILL_UNAVAILABLE:
return reconcileMarkerValues(
newRecord, oldRecord, newSchema, oldSchema);
default:
@@ -136,7 +132,7 @@ public class PartialUpdateStrategy<T> implements
Serializable {
List<Schema.Field> fields = newSchema.getFields();
Map<Integer, Object> updateValues = new HashMap<>();
T engineRecord;
- String partialUpdateCustomMarker =
mergeProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ String partialUpdateCustomMarker =
mergeProperties.get(PARTIAL_UPDATE_UNAVAILABLE_VALUE);
for (Schema.Field field : fields) {
String fieldName = field.name();
Object newValue = recordContext.getValue(newRecord.getRecord(),
newSchema, fieldName);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
index e3fb3bfc4467..3e0609003ad3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
@@ -61,22 +61,22 @@ class DefaultFileGroupRecordBufferLoader<T> extends
LogScanningRecordBufferLoade
HoodieReadStats readStats,
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props,
HoodieReaderConfig.MERGE_TYPE,
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
- PartialUpdateMode partialUpdateMode =
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
+ Option<PartialUpdateMode> partialUpdateModeOpt =
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
FileGroupRecordBuffer<T> recordBuffer;
if (isSkipMerge) {
recordBuffer = new UnmergedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, readStats);
+ readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, readStats);
} else if (readerParameters.sortOutputs()) {
recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
} else if (readerParameters.useRecordPosition() &&
inputSplit.getBaseFileOption().isPresent()) {
recordBuffer = new PositionBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, inputSplit.getBaseFileOption().get().getCommitTime(), props,
+ readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, inputSplit.getBaseFileOption().get().getCommitTime(),
props,
orderingFieldNames, updateProcessor);
} else {
recordBuffer = new KeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
}
return Pair.of(recordBuffer, scanLogFiles(readerContext, storage,
inputSplit, hoodieTableMetaClient, props,
readerParameters, readStats, recordBuffer));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
index 7052ba8c59c0..cb0bf252a209 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
@@ -70,7 +70,7 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
protected final Schema readerSchema;
protected final List<String> orderingFieldNames;
protected final RecordMergeMode recordMergeMode;
- protected final PartialUpdateMode partialUpdateMode;
+ protected final Option<PartialUpdateMode> partialUpdateModeOpt;
protected final Option<HoodieRecordMerger> recordMerger;
// The pair of payload classes represents the payload class for the table
and the payload class for the incoming records.
// The two classes are only expected to be different when there is a
merge-into operation that leverages the ExpressionPayload.
@@ -93,7 +93,7 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- PartialUpdateMode partialUpdateMode,
+ Option<PartialUpdateMode>
partialUpdateModeOpt,
TypedProperties props,
List<String> orderingFieldNames,
UpdateProcessor<T> updateProcessor) {
@@ -101,7 +101,7 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
this.updateProcessor = updateProcessor;
this.readerSchema =
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
this.recordMergeMode = recordMergeMode;
- this.partialUpdateMode = partialUpdateMode;
+ this.partialUpdateModeOpt = partialUpdateModeOpt;
this.recordMerger = readerContext.getRecordMerger();
this.payloadClasses = readerContext.getPayloadClasses(props);
this.orderingFieldNames = orderingFieldNames;
@@ -117,7 +117,7 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
throw new HoodieIOException("IOException when creating
ExternalSpillableMap at " + spillableMapBasePath, e);
}
this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
- readerContext, recordMergeMode, enablePartialMerging, recordMerger,
orderingFieldNames, readerSchema, payloadClasses, props, partialUpdateMode);
+ readerContext, recordMergeMode, enablePartialMerging, recordMerger,
orderingFieldNames, readerSchema, payloadClasses, props, partialUpdateModeOpt);
this.deleteContext =
readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(this.readerSchema);
this.bufferedRecordConverter =
BufferedRecordConverter.createConverter(readerContext.getIteratorMode(),
readerSchema, readerContext.getRecordContext(), orderingFieldNames);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
index 5f2d35f1186a..d6663ae35223 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
@@ -58,11 +58,11 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
public KeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient
hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- PartialUpdateMode partialUpdateMode,
+ Option<PartialUpdateMode>
partialUpdateModeOpt,
TypedProperties props,
List<String> orderingFieldNames,
UpdateProcessor<T> updateProcessor) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
}
@Override
@@ -87,7 +87,7 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
readerSchema,
payloadClasses,
props,
- partialUpdateMode);
+ partialUpdateModeOpt);
}
Schema schema =
AvroSchemaCache.intern(recordsIteratorSchemaPair.getRight());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
index 51a48e34e8a4..415a5731e8af 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
@@ -73,12 +73,12 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
public PositionBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
HoodieTableMetaClient
hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- PartialUpdateMode
partialUpdateMode,
+ Option<PartialUpdateMode>
partialUpdateModeOpt,
String baseFileInstantTime,
TypedProperties props,
List<String> orderingFieldNames,
UpdateProcessor<T>
updateProcessor) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
this.baseFileInstantTime = baseFileInstantTime;
}
@@ -124,7 +124,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
readerSchema,
payloadClasses,
props,
- partialUpdateMode);
+ partialUpdateModeOpt);
}
Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema =
getSchemaTransformerWithEvolvedSchema(dataBlock);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
index deedd4808b0e..aa3fbd22cf94 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
@@ -59,17 +59,17 @@ public class ReusableFileGroupRecordBufferLoader<T> extends
LogScanningRecordBuf
HoodieReadStats readStats,
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
- PartialUpdateMode partialUpdateMode =
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
+ Option<PartialUpdateMode> partialUpdateModeOpt =
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
if (cachedResults == null) {
// Create an initial buffer to process the log files
KeyBasedFileGroupRecordBuffer<T> initialBuffer = new
KeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
List<String> validInstants = scanLogFiles(readerContextWithoutFilters,
storage, inputSplit, hoodieTableMetaClient, props, readerParameters, readStats,
initialBuffer);
cachedResults = Pair.of(initialBuffer, validInstants);
}
// Create a reusable buffer with the results from the initial scan
ReusableKeyBasedRecordBuffer<T> reusableBuffer = new
ReusableKeyBasedRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor,
cachedResults.getLeft().getLogRecords());
+ readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor,
cachedResults.getLeft().getLogRecords());
return Pair.of(reusableBuffer, cachedResults.getRight());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableKeyBasedRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableKeyBasedRecordBuffer.java
index fef96a8c21cf..bce8c7f9e74f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableKeyBasedRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableKeyBasedRecordBuffer.java
@@ -55,10 +55,10 @@ public class ReusableKeyBasedRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
private final Map<Serializable, BufferedRecord<T>> existingRecords;
ReusableKeyBasedRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
- RecordMergeMode recordMergeMode,
PartialUpdateMode partialUpdateMode,
+ RecordMergeMode recordMergeMode,
Option<PartialUpdateMode> partialUpdateModeOpt,
TypedProperties props, List<String>
orderingFieldNames,
UpdateProcessor<T> updateProcessor,
Map<Serializable, BufferedRecord<T>> records) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
this.existingRecords = records;
ValidationUtils.checkArgument(readerContext.getKeyFilterOpt().orElse(null)
instanceof Predicates.In,
() -> "Key filter should be of type Predicates.In, but found: " +
readerContext.getKeyFilterOpt().map(filter ->
filter.getClass().getSimpleName()).orElse("NULL"));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/SortedKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/SortedKeyBasedFileGroupRecordBuffer.java
index c40dfca70804..686b044a6220 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/SortedKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/SortedKeyBasedFileGroupRecordBuffer.java
@@ -49,11 +49,11 @@ class SortedKeyBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupRecordBuff
SortedKeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient
hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- PartialUpdateMode
partialUpdateMode,
+ Option<PartialUpdateMode>
partialUpdateModeOpt,
TypedProperties props,
List<String> orderingFieldNames,
UpdateProcessor<T>
updateProcessor) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
index ebe09133ab5c..434af97c1fb1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
@@ -67,15 +67,15 @@ public class StreamingFileGroupRecordBufferLoader<T>
implements FileGroupRecordB
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
Schema recordSchema =
HoodieAvroUtils.removeMetadataFields(readerContext.getSchemaHandler().getRequestedSchema());
HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
- PartialUpdateMode partialUpdateMode = tableConfig.getPartialUpdateMode();
+ Option<PartialUpdateMode> partialUpdateModeOpt =
tableConfig.getPartialUpdateMode();
UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
FileGroupRecordBuffer<T> recordBuffer;
if (readerParameters.sortOutputs()) {
recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
} else {
recordBuffer = new KeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
+ readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
}
RecordContext<T> recordContext = readerContext.getRecordContext();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/UnmergedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/UnmergedFileGroupRecordBuffer.java
index 3574ef35552e..580cb88e174b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/UnmergedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/UnmergedFileGroupRecordBuffer.java
@@ -53,10 +53,10 @@ class UnmergedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- PartialUpdateMode partialUpdateMode,
+ Option<PartialUpdateMode> partialUpdateModeOpt,
TypedProperties props,
HoodieReadStats readStats) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateMode, props, Collections.emptyList(), null);
+ super(readerContext, hoodieTableMetaClient, recordMergeMode,
partialUpdateModeOpt, props, Collections.emptyList(), null);
this.readStats = readStats;
this.currentInstantLogBlocks = new ArrayDeque<>();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 494be05327c0..aa993624dbd0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -75,7 +75,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.table.read.UpdateProcessor;
@@ -1047,7 +1046,7 @@ public class HoodieTableMetadataUtil {
readerContext.setSchemaHandler(new
FileGroupReaderSchemaHandler<>(readerContext, writerSchemaOpt.get(),
writerSchemaOpt.get(), Option.empty(), properties, datasetMetaClient));
HoodieReadStats readStats = new HoodieReadStats();
KeyBasedFileGroupRecordBuffer<T> recordBuffer = new
KeyBasedFileGroupRecordBuffer<>(readerContext, datasetMetaClient,
- readerContext.getMergeMode(), PartialUpdateMode.NONE, properties,
tableConfig.getPreCombineFields(),
+ readerContext.getMergeMode(), Option.empty(), properties,
tableConfig.getPreCombineFields(),
UpdateProcessor.create(readStats, readerContext, true,
Option.empty(), properties));
// CRITICAL: Ensure allowInflightInstants is set to true
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateStrategy.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
similarity index 82%
rename from
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateStrategy.java
rename to
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
index ca4b099b9484..008c70916fae 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateStrategy.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
@@ -29,18 +29,18 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-class TestPartialUpdateStrategy {
+class TestPartialUpdateHandler {
@Test
void testEmptyProperties() {
TypedProperties props = new TypedProperties();
- Map<String, String> result =
PartialUpdateStrategy.parseMergeProperties(props);
+ Map<String, String> result =
PartialUpdateHandler.parseMergeProperties(props);
assertTrue(result.isEmpty());
}
@Test
void testDirectMatch() {
Schema stringSchema = Schema.create(Schema.Type.STRING);
- assertTrue(PartialUpdateStrategy.hasTargetType(stringSchema,
Schema.Type.STRING));
+ assertTrue(PartialUpdateHandler.hasTargetType(stringSchema,
Schema.Type.STRING));
}
@Test
@@ -50,7 +50,7 @@ class TestPartialUpdateStrategy {
Schema.create(Schema.Type.BOOLEAN),
Schema.create(Schema.Type.STRING)
);
- assertTrue(PartialUpdateStrategy.hasTargetType(unionSchema,
Schema.Type.STRING));
+ assertTrue(PartialUpdateHandler.hasTargetType(unionSchema,
Schema.Type.STRING));
}
@Test
@@ -60,12 +60,12 @@ class TestPartialUpdateStrategy {
Schema.create(Schema.Type.BOOLEAN),
Schema.create(Schema.Type.INT)
);
- assertFalse(PartialUpdateStrategy.hasTargetType(unionSchema,
Schema.Type.STRING));
+ assertFalse(PartialUpdateHandler.hasTargetType(unionSchema,
Schema.Type.STRING));
}
@Test
void testNonUnionNonTargetType() {
Schema intSchema = Schema.create(Schema.Type.INT);
- assertFalse(PartialUpdateStrategy.hasTargetType(intSchema,
Schema.Type.STRING));
+ assertFalse(PartialUpdateHandler.hasTargetType(intSchema,
Schema.Type.STRING));
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
index a0c0b27ef537..579f6eaf0b49 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.table.read.HoodieReadStats;
@@ -125,7 +124,7 @@ public class BaseTestFileGroupRecordBuffer {
if (fileGroupRecordBufferItrOpt.isEmpty()) {
return new KeyBasedFileGroupRecordBuffer<>(
- readerContext, mockMetaClient, recordMergeMode,
PartialUpdateMode.NONE, props, orderingFieldNames, updateProcessor);
+ readerContext, mockMetaClient, recordMergeMode, Option.empty(),
props, orderingFieldNames, updateProcessor);
} else {
FileGroupRecordBufferLoader recordBufferLoader =
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
InputSplit inputSplit = mock(InputSplit.class);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
index 67ace014bd61..330a57f19794 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecords;
import org.apache.hudi.common.table.read.DeleteContext;
@@ -161,7 +160,7 @@ class TestFileGroupRecordBuffer {
readerContext,
hoodieTableMetaClient,
RecordMergeMode.COMMIT_TIME_ORDERING,
- PartialUpdateMode.NONE,
+ Option.empty(),
props,
Collections.emptyList(),
updateProcessor
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
index 67b03db2c0b2..649b6ee1720c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
@@ -63,6 +63,7 @@ public class TestFileGroupRecordBufferLoader extends
BaseTestFileGroupRecordBuff
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.NINE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.empty());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
index b4011a7205a5..4e85f74bbd37 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
@@ -146,7 +145,7 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
properties.setProperty(DELETE_MARKER, "3");
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
@@ -213,7 +212,7 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
properties.setProperty(DELETE_MARKER, "3");
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
@@ -247,7 +246,7 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new
HoodieAvroRecordMerger(),
@@ -292,7 +291,7 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
when(tableConfig.getPayloadClass()).thenReturn(TestKeyBasedFileGroupRecordBuffer.CustomPayload.class.getName());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
@@ -366,7 +365,7 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
index 9f3f6d8d44e1..50a8f1b43dd0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
@@ -22,7 +22,6 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.HoodieReadStats;
import org.apache.hudi.common.table.read.UpdateProcessor;
@@ -90,7 +89,7 @@ class TestReusableKeyBasedRecordBuffer {
when(mockReaderContext.getRecordContext().seal(any())).thenAnswer(invocation ->
invocation.getArgument(0));
ReusableKeyBasedRecordBuffer<TestRecord> buffer = new
ReusableKeyBasedRecordBuffer<>(mockReaderContext, metaClient,
- RecordMergeMode.EVENT_TIME_ORDERING, PartialUpdateMode.NONE, new
TypedProperties(), Collections.singletonList("value"), updateProcessor,
preMergedLogRecords);
+ RecordMergeMode.EVENT_TIME_ORDERING, Option.empty(), new
TypedProperties(), Collections.singletonList("value"), updateProcessor,
preMergedLogRecords);
List<TestRecord> baseFileRecords = Arrays.asList(new TestRecord("1", 10),
new TestRecord("3", 30));
buffer.setBaseFileIterator(ClosableIterator.wrap(baseFileRecords.iterator()));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index a1ec3f66caca..1f2394db848e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -133,7 +133,7 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuf
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
FileGroupRecordBufferLoader recordBufferLoader =
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
InputSplit inputSplit = mock(InputSplit.class);
@@ -199,12 +199,12 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuf
when(mockReaderContext.getRecordContext().seal(any())).thenAnswer(invocation ->
invocation.getArgument(0));
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
RecordMergeMode recordMergeMode = RecordMergeMode.COMMIT_TIME_ORDERING;
- PartialUpdateMode partialUpdateMode = PartialUpdateMode.NONE;
+ Option<PartialUpdateMode> partialUpdateModeOpt = Option.empty();
TypedProperties props = new TypedProperties();
when(mockReaderContext.getPayloadClasses(any())).thenReturn(Option.empty());
UpdateProcessor<TestRecord> updateProcessor =
UpdateProcessor.create(readStats, mockReaderContext, false, Option.empty(),
props);
return new SortedKeyBasedFileGroupRecordBuffer<>(
- mockReaderContext, mockMetaClient, recordMergeMode, partialUpdateMode,
props, Collections.emptyList(), updateProcessor);
+ mockReaderContext, mockMetaClient, recordMergeMode,
partialUpdateModeOpt, props, Collections.emptyList(), updateProcessor);
}
private static <T> List<T>
getActualRecordsForSortedKeyBased(SortedKeyBasedFileGroupRecordBuffer<T>
fileGroupRecordBuffer) throws IOException {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
index 8c7e5d65a530..ee77ee1bf31c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.table.read.HoodieReadStats;
import org.apache.hudi.common.util.Option;
@@ -76,7 +75,7 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecord
properties.setProperty(DELETE_MARKER, "3");
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
@@ -113,7 +112,7 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecord
properties.setProperty(DELETE_MARKER, "3");
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
@@ -152,7 +151,7 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecord
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
@@ -189,7 +188,7 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecord
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
-
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+ when(tableConfig.getPartialUpdateMode()).thenReturn(Option.empty());
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index f09fc348b101..a25856ca3bd8 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -170,7 +170,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
assertTrue(storage.exists(cfgPath));
assertFalse(storage.exists(backupCfgPath));
- HoodieTableConfig config = new HoodieTableConfig(storage, metaPath, null,
null, null);
+ HoodieTableConfig config = new HoodieTableConfig(storage, metaPath);
assertEquals(9, config.getProps().size());
assertEquals("test-table2", config.getTableName());
assertEquals(Collections.singletonList("new_field"),
config.getPreCombineFields());
@@ -184,7 +184,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
// delete a non existant property as well
propsToDelete.add(HoodieTableConfig.RECORDKEY_FIELDS.key());
HoodieTableConfig.updateAndDeleteProps(storage, metaPath, updatedProps,
propsToDelete);
- config = new HoodieTableConfig(storage, metaPath, null, null, null);
+ config = new HoodieTableConfig(storage, metaPath);
assertEquals(8, config.getProps().size());
assertEquals("test-table2", config.getTableName());
assertEquals(Collections.singletonList("new_field2"),
config.getPreCombineFields());
@@ -193,7 +193,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
// just delete 1 property w/o updating anything.
updatedProps = new Properties();
HoodieTableConfig.updateAndDeleteProps(storage, metaPath, updatedProps,
Collections.singleton(HoodieTableConfig.PRECOMBINE_FIELDS.key()));
- config = new HoodieTableConfig(storage, metaPath, null, null, null);
+ config = new HoodieTableConfig(storage, metaPath);
assertEquals(7, config.getProps().size());
assertEquals("test-table2", config.getTableName());
assertTrue(config.getPreCombineFields().isEmpty());
@@ -608,7 +608,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
// Test case: Version 9 table with PostgresDebeziumAvroPayload (should
set partial update mode and custom properties)
arguments("Version 9 with PostgresDebeziumAvroPayload", null,
PostgresDebeziumAvroPayload.class.getName(), null, "ts",
HoodieTableVersion.NINE,
5, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
PostgresDebeziumAvroPayload.class.getName(),
- PartialUpdateMode.IGNORE_MARKERS.name(),
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, null, null),
+ PartialUpdateMode.FILL_UNAVAILABLE.name(),
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, null, null),
// Test case: Version 9 table with AWSDmsAvroPayload (should set
custom delete properties)
arguments("Version 9 with AWSDmsAvroPayload", null,
AWSDmsAvroPayload.class.getName(), null, null, HoodieTableVersion.NINE,
@@ -674,11 +674,11 @@ class TestHoodieTableConfig extends
HoodieCommonTestHarness {
if (expectedDebeziumMarker != null) {
assertEquals(expectedDebeziumMarker, configs.get(
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE),
"Debezium marker mismatch for: " + testName);
} else {
-
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
+ HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER),
- "Custom merge property " +
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER + " not expected to be set");
+
assertFalse(configs.containsKey(HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
+ HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE),
+ "Custom merge property " +
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE + " not expected to be set");
}
if (expectedDeleteKey != null) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index e232bf953e85..c0872b9f7fbe 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -89,12 +89,12 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
} else {
Option.empty.asInstanceOf[Option[String]]
}
- private lazy val partialUpdateMode: PartialUpdateMode =
metaClient.getTableConfig.getPartialUpdateMode
+ private lazy val partialUpdateModeOpt: Option[PartialUpdateMode] =
metaClient.getTableConfig.getPartialUpdateMode
private var isPartialMergeEnabled = false
private var bufferedRecordMerger = getBufferedRecordMerger
private def getBufferedRecordMerger: BufferedRecordMerger[InternalRow] =
BufferedRecordMergerFactory.create(readerContext,
readerContext.getMergeMode, isPartialMergeEnabled,
Option.of(recordMerger), orderingFieldNames,
- payloadClass, avroSchema, props, partialUpdateMode)
+ payloadClass, avroSchema, props, partialUpdateModeOpt)
private lazy val storage = metaClient.getStorage
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
index a5b2b996d9f6..cddfb1f6d4b4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
@@ -75,7 +75,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.BaseSparkInternalRecordContext.getFieldValueFromInternalRow;
import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
-import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
import static org.apache.hudi.common.table.HoodieTableConfig.PRECOMBINE_FIELDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -124,9 +124,9 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
@MethodSource("mergeModeAndStageProvider")
void testRegularMerging(RecordMergeMode mergeMode, PartialUpdateMode
updateMode, MergeStage stage) throws IOException {
- if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+ if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
props.put(
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_UNAVAILABLE_VALUE,
IGNORE_MARKERS_VALUE);
}
@@ -140,7 +140,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
}
private void runDeltaMerge(RecordMergeMode mergeMode, PartialUpdateMode
updateMode) throws IOException {
- BufferedRecordMerger<InternalRow> merger = createMerger(readerContext,
mergeMode, updateMode);
+ BufferedRecordMerger<InternalRow> merger = createMerger(readerContext,
mergeMode, Option.of(updateMode));
// Create records with all columns.
InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old
City", 1000L);
InternalRow newRecord = createFullRecord("new_id", "New Name", 0,
IGNORE_MARKERS_VALUE, 0L);
@@ -153,16 +153,16 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
Option<BufferedRecord<InternalRow>> deltaResult =
merger.deltaMerge(newBufferedRecord, oldBufferedRecord);
if (mergeMode == COMMIT_TIME_ORDERING) {
assertTrue(deltaResult.isPresent());
- if (updateMode == PartialUpdateMode.NONE) {
+ if (updateMode == null) {
assertEquals(newRecord, deltaResult.get().getRecord());
} else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
assertEquals(25, deltaResult.get().getRecord().getInt(2));
assertEquals(1000L, deltaResult.get().getRecord().getLong(4));
- } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+ } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
assertEquals("Old City", deltaResult.get().getRecord().getString(3));
}
} else {
- if (updateMode == PartialUpdateMode.NONE) {
+ if (updateMode == null) {
assertTrue(deltaResult.isEmpty());
} else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
assertTrue(deltaResult.isPresent());
@@ -175,12 +175,12 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
newBufferedRecord = new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE + 1,
newRecord, 1, null);
deltaResult = merger.deltaMerge(newBufferedRecord, oldBufferedRecord);
assertTrue(deltaResult.isPresent());
- if (updateMode == PartialUpdateMode.NONE) {
+ if (updateMode == null) {
assertEquals(newRecord, deltaResult.get().getRecord());
} else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
assertEquals(25, deltaResult.get().getRecord().getInt(2));
assertEquals(1000L, deltaResult.get().getRecord().getLong(4));
- } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+ } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
assertEquals("Old City", deltaResult.get().getRecord().getString(3));
}
@@ -189,18 +189,18 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
newBufferedRecord = new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE,
newRecord, 1, null);
deltaResult = merger.deltaMerge(newBufferedRecord, oldBufferedRecord);
assertTrue(deltaResult.isPresent());
- if (updateMode == PartialUpdateMode.NONE) {
+ if (updateMode == null) {
assertEquals(newRecord, deltaResult.get().getRecord());
} else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
assertEquals(25, deltaResult.get().getRecord().getInt(2));
assertEquals(1000L, deltaResult.get().getRecord().getLong(4));
- } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+ } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
assertEquals("Old City", deltaResult.get().getRecord().getString(3));
}
}
private void runDeltaDeleteMerge(RecordMergeMode mergeMode,
PartialUpdateMode updateMode) throws IOException {
- BufferedRecordMerger<InternalRow> merger = createMerger(readerContext,
mergeMode, updateMode);
+ BufferedRecordMerger<InternalRow> merger = createMerger(readerContext,
mergeMode, Option.ofNullable(updateMode));
// Create records with all columns.
InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old
City", 1000L);
InternalRow newRecord = createFullRecord("new_id", "New Name", 0,
IGNORE_MARKERS_VALUE, 0L);
@@ -231,7 +231,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
}
private void runFinalMerge(RecordMergeMode mergeMode, PartialUpdateMode
updateMode) throws IOException {
- BufferedRecordMerger<InternalRow> merger = createMerger(readerContext,
mergeMode, updateMode);
+ BufferedRecordMerger<InternalRow> merger = createMerger(readerContext,
mergeMode, Option.ofNullable(updateMode));
InternalRow oldRecord = createFullRecord(
"older_id", "Older Name", 20, "Older City", 500L);
InternalRow newRecord = createFullRecord(
@@ -245,12 +245,12 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
BufferedRecord<InternalRow> finalResult =
merger.finalMerge(olderBufferedRecord, newerBufferedRecord);
assertFalse(finalResult.isDelete());
if (mergeMode == COMMIT_TIME_ORDERING) {
- if (updateMode == PartialUpdateMode.NONE) {
+ if (updateMode == null) {
assertEquals(newRecord, finalResult.getRecord());
} else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
assertEquals(20, finalResult.getRecord().getInt(2));
assertEquals(500L, finalResult.getRecord().getLong(4));
- } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+ } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
assertEquals("Older City", finalResult.getRecord().getString(3));
}
} else {
@@ -262,12 +262,12 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
newerBufferedRecord = new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE + 1,
newRecord, 1, null);
finalResult = merger.finalMerge(olderBufferedRecord, newerBufferedRecord);
assertFalse(finalResult.isDelete());
- if (updateMode == PartialUpdateMode.NONE) {
+ if (updateMode == null) {
assertEquals(newRecord, finalResult.getRecord());
} else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
assertEquals(20, finalResult.getRecord().getInt(2));
assertEquals(500, finalResult.getRecord().getLong(4));
- } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+ } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
assertEquals("Older City", finalResult.getRecord().getString(3));
}
@@ -276,12 +276,12 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
newerBufferedRecord = new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE,
newRecord, 1, null);
finalResult = merger.finalMerge(olderBufferedRecord, newerBufferedRecord);
assertFalse(finalResult.isDelete());
- if (updateMode == PartialUpdateMode.NONE) {
+ if (updateMode == null) {
assertEquals(newRecord, finalResult.getRecord());
} else if (updateMode == PartialUpdateMode.IGNORE_DEFAULTS) {
assertEquals(20, finalResult.getRecord().getInt(2));
assertEquals(500, finalResult.getRecord().getLong(4));
- } else if (updateMode == PartialUpdateMode.IGNORE_MARKERS) {
+ } else if (updateMode == PartialUpdateMode.FILL_UNAVAILABLE) {
assertEquals("Older City", finalResult.getRecord().getString(3));
}
}
@@ -381,7 +381,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
@EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING",
"EVENT_TIME_ORDERING"})
void testRegularMergingWithIgnoreDefaultsNested(RecordMergeMode mergeMode)
throws IOException {
BufferedRecordMerger<InternalRow> ignoreDefaultsMerger =
- createMerger(readerContext, mergeMode,
PartialUpdateMode.IGNORE_DEFAULTS);
+ createMerger(readerContext, mergeMode,
Option.of(PartialUpdateMode.IGNORE_DEFAULTS));
// Old record has all columns, new record has some columns with
default/null values
InternalRow oldRecordWithDefaults = createFullRecordWithCompany(
@@ -412,7 +412,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
@EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING",
"EVENT_TIME_ORDERING"})
void testDeltaMergeWithNullExistingRecord(RecordMergeMode mergeMode) throws
IOException {
BufferedRecordMerger<InternalRow> merger = createMerger(
- readerContext, mergeMode, PartialUpdateMode.NONE);
+ readerContext, mergeMode, Option.empty());
// New record is not delete.
InternalRow newRecord = createFullRecord(
@@ -449,7 +449,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
@EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING",
"EVENT_TIME_ORDERING"})
void testDeltaMergeWithDeleteRecord(RecordMergeMode mergeMode) {
- BufferedRecordMerger<InternalRow> merger = createMerger(readerContext,
mergeMode, PartialUpdateMode.NONE);
+ BufferedRecordMerger<InternalRow> merger = createMerger(readerContext,
mergeMode, Option.empty());
// Delete record has null ordering value.
InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old
City", 1000L);
@@ -553,7 +553,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
@EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING",
"EVENT_TIME_ORDERING"})
void testFinalMergeWithDeleteRecords(RecordMergeMode mergeMode) throws
IOException {
BufferedRecordMerger<InternalRow> merger = createMerger(
- readerContext, mergeMode, PartialUpdateMode.NONE);
+ readerContext, mergeMode, Option.empty());
InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old
City", 1000L);
InternalRow newRecord = createFullRecord("new_id", "New Name", 29, "New
City", 2000L);
@@ -608,7 +608,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
)
.filter(args -> {
PartialUpdateMode updateMode = (PartialUpdateMode) args.get()[1];
- return updateMode != PartialUpdateMode.FILL_DEFAULTS && updateMode
!= PartialUpdateMode.KEEP_VALUES;
+ return updateMode != PartialUpdateMode.IGNORE_DEFAULTS;
});
}
@@ -661,7 +661,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
private BufferedRecordMerger<InternalRow>
createMerger(HoodieReaderContext<InternalRow> readerContext,
RecordMergeMode
mergeMode,
- PartialUpdateMode
partialUpdateMode) {
+
Option<PartialUpdateMode> partialUpdateModeOpt) {
return BufferedRecordMergerFactory.create(
readerContext,
mergeMode,
@@ -673,7 +673,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
Option.empty(), // payloadClass
READER_SCHEMA, // readerSchema
props, // props
- partialUpdateMode
+ partialUpdateModeOpt
);
}
@@ -690,7 +690,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
Option.empty(), // payloadClass
READER_SCHEMA, // readerSchema
props, // props
- PartialUpdateMode.KEEP_VALUES // partialUpdateMode
+ Option.of(PartialUpdateMode.IGNORE_DEFAULTS) // partialUpdateMode
);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
index b6ac8ad23f84..283964625047 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
@@ -25,13 +25,13 @@ import org.apache.hudi.common.model.{AWSDmsAvroPayload,
EventTimeAvroPayload, Ho
import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY,
DELETE_MARKER}
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion, PartialUpdateMode}
-import
org.apache.hudi.common.table.HoodieTableConfig.{DEBEZIUM_UNAVAILABLE_VALUE,
PARTIAL_UPDATE_CUSTOM_MARKER, RECORD_MERGE_PROPERTY_PREFIX}
+import
org.apache.hudi.common.table.HoodieTableConfig.{DEBEZIUM_UNAVAILABLE_VALUE,
PARTIAL_UPDATE_UNAVAILABLE_VALUE, RECORD_MERGE_PROPERTY_PREFIX}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper,
UpgradeDowngrade}
import org.apache.spark.sql.SaveMode
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
@@ -112,8 +112,8 @@ class TestEightToNineUpgrade extends
RecordLevelIndexTestBase {
assertEquals(HoodieTableVersion.EIGHT,
metaClient.getTableConfig.getTableVersion)
// The payload class should be maintained.
assertEquals(payloadClass, metaClient.getTableConfig.getPayloadClass)
- // The partial update mode should be NONE.
- assertEquals(PartialUpdateMode.NONE,
metaClient.getTableConfig.getPartialUpdateMode)
+ // The partial update mode should not be present
+ assertTrue(metaClient.getTableConfig.getPartialUpdateMode.isEmpty)
if
(payloadClass.equals("org.apache.hudi.common.model.EventTimeAvroPayload")) {
assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
} else {
@@ -135,30 +135,33 @@ class TestEightToNineUpgrade extends
RecordLevelIndexTestBase {
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
assertEquals(RecordMergeMode.EVENT_TIME_ORDERING,
metaClient.getTableConfig.getRecordMergeMode)
- assertEquals(PartialUpdateMode.IGNORE_DEFAULTS,
metaClient.getTableConfig.getPartialUpdateMode)
+ assertEquals(PartialUpdateMode.IGNORE_DEFAULTS,
metaClient.getTableConfig.getPartialUpdateMode.get())
} else if
(payloadClass.equals(classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName))
{
assertEquals(
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING,
metaClient.getTableConfig.getRecordMergeMode)
- assertEquals(PartialUpdateMode.IGNORE_DEFAULTS,
metaClient.getTableConfig.getPartialUpdateMode)
+ assertEquals(PartialUpdateMode.IGNORE_DEFAULTS,
metaClient.getTableConfig.getPartialUpdateMode.get())
} else if
(payloadClass.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
assertEquals(
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
assertEquals(RecordMergeMode.EVENT_TIME_ORDERING,
metaClient.getTableConfig.getRecordMergeMode)
- assertEquals(PartialUpdateMode.IGNORE_MARKERS,
metaClient.getTableConfig.getPartialUpdateMode)
- val customMarker =
metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${PARTIAL_UPDATE_CUSTOM_MARKER}")
+ assertEquals(PartialUpdateMode.FILL_UNAVAILABLE,
metaClient.getTableConfig.getPartialUpdateMode.get())
+ val customMarker =
metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${PARTIAL_UPDATE_UNAVAILABLE_VALUE}")
assertEquals(DEBEZIUM_UNAVAILABLE_VALUE, customMarker)
} else if (payloadClass.equals(classOf[AWSDmsAvroPayload].getName)) {
assertEquals(
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING,
metaClient.getTableConfig.getRecordMergeMode)
+ assertTrue(metaClient.getTableConfig.getPartialUpdateMode.isEmpty)
val deleteField =
metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${DELETE_KEY}")
assertEquals(AWSDmsAvroPayload.OP_FIELD, deleteField)
val deleteMarker =
metaClient.getTableConfig.getString(s"${RECORD_MERGE_PROPERTY_PREFIX}${DELETE_MARKER}")
assertEquals(AWSDmsAvroPayload.DELETE_OPERATION_VALUE, deleteMarker)
+ } else {
+ assertTrue(metaClient.getTableConfig.getPartialUpdateMode.isEmpty)
}
}
}
@@ -172,7 +175,7 @@ object TestEightToNineUpgrade {
Arguments.of("COPY_ON_WRITE",
classOf[PostgresDebeziumAvroPayload].getName),
Arguments.of("COPY_ON_WRITE", classOf[AWSDmsAvroPayload].getName),
Arguments.of("MERGE_ON_READ", classOf[EventTimeAvroPayload].getName),
- Arguments.of("COPY_ON_WRITE", classOf[PartialUpdateAvroPayload].getName),
+ Arguments.of("MERGE_ON_READ", classOf[PartialUpdateAvroPayload].getName),
Arguments.of("MERGE_ON_READ",
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName),
Arguments.of("MERGE_ON_READ",
classOf[PostgresDebeziumAvroPayload].getName),
Arguments.of("MERGE_ON_READ", classOf[AWSDmsAvroPayload].getName)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index 1c98a849bb29..b99c1f4159d6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -260,7 +260,7 @@ object TestPayloadDeprecationFlow {
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PostgresDebeziumAvroPayload].getName,
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
-> "__debezium_unavailable_value"),
Arguments.of(
"COPY_ON_WRITE",
@@ -322,7 +322,7 @@ object TestPayloadDeprecationFlow {
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PostgresDebeziumAvroPayload].getName,
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
-> "__debezium_unavailable_value"),
Arguments.of(
"MERGE_ON_READ",