This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 57f153c8d55 [HUDI-8554] Use COMMIT_TIME_ORDERING (#12304)
57f153c8d55 is described below
commit 57f153c8d55ea114dad2ef83a831bbb87a579f74
Author: Lin Liu <[email protected]>
AuthorDate: Thu Nov 21 21:51:36 2024 -0800
[HUDI-8554] Use COMMIT_TIME_ORDERING (#12304)
* Use commit time ordering
* address comments
---
.../apache/hudi/cli/commands/BootstrapCommand.java | 2 +-
.../hudi/OverwriteWithLatestSparkRecordMerger.java | 2 +-
.../hudi/BaseSparkInternalRowReaderContext.java | 4 +--
.../apache/hudi/common/config/RecordMergeMode.java | 2 +-
.../hudi/common/model/HoodieRecordMerger.java | 2 +-
.../hudi/common/model/HoodieRecordPayload.java | 2 +-
.../common/model/OverwriteWithLatestMerger.java | 2 +-
.../hudi/common/table/HoodieTableConfig.java | 26 +++++++-------
.../read/HoodieBaseFileGroupRecordBuffer.java | 8 ++---
.../HoodiePositionBasedFileGroupRecordBuffer.java | 2 +-
.../table/read/TestHoodieFileGroupReaderBase.java | 4 +--
.../hudi/common/table/TestHoodieTableConfig.java | 42 +++++++++++-----------
.../table/read/TestOverwriteWithLatestMerger.java | 6 ++--
.../hudi/hadoop/HiveHoodieReaderContext.java | 4 +--
.../OverwriteWithLatestHiveRecordMerger.java | 2 +-
...stHoodiePositionBasedFileGroupRecordBuffer.java | 6 ++--
.../hudi/functional/TestBasicSchemaEvolution.scala | 2 +-
.../hudi/utilities/streamer/HoodieStreamer.java | 2 +-
18 files changed, 60 insertions(+), 60 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java
index 57da6c7da79..fff8b0bfa87 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/BootstrapCommand.java
@@ -75,7 +75,7 @@ public class BootstrapCommand {
@ShellOption(value = {"--payloadClass"}, defaultValue = "",
help = "Payload Class (deprecated). Use `--merge-mode` to specify
the equivalent payload update behavior.") final String payloadClass,
@ShellOption(value = {"--merge-mode", "--record-merge-mode"},
defaultValue = "",
- help = "Merge mode to use. 'EVENT_TIME_ORDERING',
'OVERWRITE_WITH_LATEST', "
+ help = "Merge mode to use. 'EVENT_TIME_ORDERING',
'COMMIT_TIME_ORDERING', "
+ "or 'CUSTOM' if you want to set a custom merge strategy ID and
implementation.") final String recordMergeMode,
@ShellOption(value = {"--merge-strategy-id",
"--record-merge-strategy-id"}, defaultValue = "",
help = "ID of the merge strategy to use. Only set when using
'CUSTOM' merge mode") final String recordMergeStrategyId,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
index 7e6336a4e95..403f392ef1a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
@@ -35,7 +35,7 @@ public class OverwriteWithLatestSparkRecordMerger extends
HoodieSparkRecordMerge
@Override
public String getMergingStrategy() {
- return OVERWRITE_MERGE_STRATEGY_UUID;
+ return COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 7a057a509bc..6df203d3718 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -56,11 +56,11 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
@Override
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
// TODO(HUDI-7843):
- // get rid of event time and overwrite with latest. Just return
Option.empty
+ // get rid of event time and commit time ordering. Just return Option.empty
switch (mergeMode) {
case EVENT_TIME_ORDERING:
return Option.of(new DefaultSparkRecordMerger());
- case OVERWRITE_WITH_LATEST:
+ case COMMIT_TIME_ORDERING:
return Option.of(new OverwriteWithLatestSparkRecordMerger());
case CUSTOM:
default:
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
index 5f294728342..fa20d70094e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.util.StringUtils;
public enum RecordMergeMode {
@EnumFieldDescription("Using transaction time to merge records, i.e., the
record from later "
+ "transaction overwrites the earlier record with the same key.")
- OVERWRITE_WITH_LATEST,
+ COMMIT_TIME_ORDERING,
@EnumFieldDescription("Using event time as the ordering to merge records,
i.e., the record "
+ "with the larger event time overwrites the record with the smaller
event time on the "
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 424749d2797..9628e9ced24 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -47,7 +47,7 @@ public interface HoodieRecordMerger extends Serializable {
String DEFAULT_MERGE_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
// Always chooses the most recently written record
- String OVERWRITE_MERGE_STRATEGY_UUID =
"ce9acb64-bde0-424c-9b91-f6ebba25356d";
+ String COMMIT_TIME_BASED_MERGE_STRATEGY_UUID =
"ce9acb64-bde0-424c-9b91-f6ebba25356d";
// Use avro payload to merge records
String PAYLOAD_BASED_MERGE_STRATEGY_UUID =
"00000000-0000-0000-0000-000000000000";
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 5bb65b98180..39ff6ceb369 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -152,7 +152,7 @@ public interface HoodieRecordPayload<T extends
HoodieRecordPayload> extends Seri
default:
case EVENT_TIME_ORDERING:
return DefaultHoodieRecordPayload.class.getName();
- case OVERWRITE_WITH_LATEST:
+ case COMMIT_TIME_ORDERING:
return OverwriteWithLatestAvroPayload.class.getName();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
index 4c10e9c3c25..90c12fc77ed 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
@@ -44,6 +44,6 @@ public class OverwriteWithLatestMerger implements
HoodieRecordMerger {
@Override
public String getMergingStrategy() {
- return OVERWRITE_MERGE_STRATEGY_UUID;
+ return COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index b934f0e811e..9a0773eedd7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -78,7 +78,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM;
import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
-import static
org.apache.hudi.common.config.RecordMergeMode.OVERWRITE_WITH_LATEST;
+import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.INPUT_TIME_UNIT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_INPUT_DATE_FORMAT;
@@ -89,7 +89,7 @@ import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAM
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
-import static
org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGE_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.util.ConfigUtils.fetchConfigs;
import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
@@ -740,8 +740,8 @@ public class HoodieTableConfig extends HoodieConfig {
}
// set merger strategy based on merge mode
- if (inferredRecordMergeMode == OVERWRITE_WITH_LATEST) {
- inferredRecordMergeStrategyId = OVERWRITE_MERGE_STRATEGY_UUID;
+ if (inferredRecordMergeMode == COMMIT_TIME_ORDERING) {
+ inferredRecordMergeStrategyId =
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
} else if (inferredRecordMergeMode == EVENT_TIME_ORDERING) {
inferredRecordMergeStrategyId = DEFAULT_MERGE_STRATEGY_UUID;
} else {
@@ -754,10 +754,10 @@ public class HoodieTableConfig extends HoodieConfig {
checkArgument(recordMergeMode == null || recordMergeMode ==
EVENT_TIME_ORDERING,
"Default merge strategy ID can only be used with the merge mode
of EVENT_TIME_ORDERING");
inferredRecordMergeMode = EVENT_TIME_ORDERING;
- } else if
(recordMergeStrategyId.equals(OVERWRITE_MERGE_STRATEGY_UUID)) {
- checkArgument(recordMergeMode == null || recordMergeMode ==
OVERWRITE_WITH_LATEST,
- "Overwrite with latest merger strategy ID can only be used with
the merge mode of OVERWRITE_WITH_LATEST");
- inferredRecordMergeMode = OVERWRITE_WITH_LATEST;
+ } else if
(recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)) {
+ checkArgument(recordMergeMode == null || recordMergeMode ==
COMMIT_TIME_ORDERING,
+ "Commit time ordering merger strategy ID can only be used with
the merge mode of COMMIT_TIME_ORDERING");
+ inferredRecordMergeMode = COMMIT_TIME_ORDERING;
} else {
checkArgument(!recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
"Payload based strategy should only be used if you have a custom
payload class set");
@@ -786,18 +786,18 @@ public class HoodieTableConfig extends HoodieConfig {
// currently for the custom case. This block will be moved below and
check if the payload class name is dummy
checkArgument(recordMergeMode == null || recordMergeMode == CUSTOM,
"Record merge mode, payload class, and merge strategy are in an illegal
configuration");
checkArgument(
- !recordMergeStrategyId.equals(OVERWRITE_MERGE_STRATEGY_UUID) &&
!recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+
!recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID) &&
!recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
"Record merger strategy is incompatible with payload class");
inferredRecordMergeMode = CUSTOM;
inferredRecordMergeStrategyId = recordMergeStrategyId;
}
} else if
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
// strategy and merge mode must be unset or align with overwrite
- checkArgument(isNullOrEmpty(recordMergeStrategyId) ||
recordMergeStrategyId.equals(OVERWRITE_MERGE_STRATEGY_UUID),
+ checkArgument(isNullOrEmpty(recordMergeStrategyId) ||
recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
"Record merge strategy cannot be set if a merge payload is used");
- checkArgument(recordMergeMode == null || recordMergeMode ==
OVERWRITE_WITH_LATEST, "Only overwrite with latest record merge mode can be
used with overwrite payload");
- inferredRecordMergeMode = OVERWRITE_WITH_LATEST;
- inferredRecordMergeStrategyId = OVERWRITE_MERGE_STRATEGY_UUID;
+ checkArgument(recordMergeMode == null || recordMergeMode ==
COMMIT_TIME_ORDERING, "Only commit time ordering merge mode can be used with
overwrite payload");
+ inferredRecordMergeMode = COMMIT_TIME_ORDERING;
+ inferredRecordMergeStrategyId = COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
} else {
// using custom avro payload
checkArgument(isNullOrEmpty(recordMergeStrategyId) ||
recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 10902887849..e0014460916 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -112,7 +112,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
this.payloadClass = Option.empty();
}
this.orderingFieldName =
Option.ofNullable(ConfigUtils.getOrderingField(props)).orElseGet(() ->
hoodieTableMetaClient.getTableConfig().getPreCombineField());
- this.orderingFieldTypeOpt = recordMergeMode ==
RecordMergeMode.OVERWRITE_WITH_LATEST ? Option.empty() :
AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName);
+ this.orderingFieldTypeOpt = recordMergeMode ==
RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() :
AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName);
this.orderingFieldDefault = orderingFieldTypeOpt.map(type ->
readerContext.castValue(0, type)).orElse(0);
this.props = props;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
@@ -218,7 +218,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
return Option.empty();
} else {
switch (recordMergeMode) {
- case OVERWRITE_WITH_LATEST:
+ case COMMIT_TIME_ORDERING:
return Option.empty();
case EVENT_TIME_ORDERING:
Comparable existingOrderingValue = readerContext.getOrderingValue(
@@ -295,7 +295,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Pair<Option<T>,
Map<String, Object>> existingRecordMetadataPair) {
if (existingRecordMetadataPair != null) {
switch (recordMergeMode) {
- case OVERWRITE_WITH_LATEST:
+ case COMMIT_TIME_ORDERING:
return Option.empty();
case EVENT_TIME_ORDERING:
case CUSTOM:
@@ -400,7 +400,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
return Option.empty();
} else {
switch (recordMergeMode) {
- case OVERWRITE_WITH_LATEST:
+ case COMMIT_TIME_ORDERING:
return newer;
case EVENT_TIME_ORDERING:
Comparable oldOrderingValue = readerContext.getOrderingValue(
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 517758f6f92..be842cad4c4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -173,7 +173,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
}
switch (recordMergeMode) {
- case OVERWRITE_WITH_LATEST:
+ case COMMIT_TIME_ORDERING:
for (Long recordPosition : recordPositions) {
records.putIfAbsent(recordPosition,
Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(null, "", orderingFieldDefault,
orderingFieldTypeOpt)));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index ef3a04398b6..8d968c8fad6 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -103,8 +103,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
private static Stream<Arguments> testArguments() {
return Stream.of(
- arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "avro"),
- arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "parquet"),
+ arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "avro"),
+ arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "parquet"),
arguments(RecordMergeMode.EVENT_TIME_ORDERING, "avro"),
arguments(RecordMergeMode.EVENT_TIME_ORDERING, "parquet"),
arguments(RecordMergeMode.CUSTOM, "avro"),
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 381fa99ff38..18c33253265 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -56,9 +56,9 @@ import java.util.stream.Stream;
import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM;
import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
-import static
org.apache.hudi.common.config.RecordMergeMode.OVERWRITE_WITH_LATEST;
+import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
-import static
org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGE_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
@@ -317,16 +317,16 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
arguments(null, null, DEFAULT_MERGE_STRATEGY_UUID, false,
EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
//test legal overwrite combos
- arguments(OVERWRITE_WITH_LATEST, null, null, false,
OVERWRITE_WITH_LATEST, overwritePayload, OVERWRITE_MERGE_STRATEGY_UUID),
- arguments(OVERWRITE_WITH_LATEST, overwritePayload, null, false,
OVERWRITE_WITH_LATEST, overwritePayload, OVERWRITE_MERGE_STRATEGY_UUID),
- arguments(OVERWRITE_WITH_LATEST, overwritePayload,
OVERWRITE_MERGE_STRATEGY_UUID, false, OVERWRITE_WITH_LATEST, overwritePayload,
- OVERWRITE_MERGE_STRATEGY_UUID),
- arguments(OVERWRITE_WITH_LATEST, null, OVERWRITE_MERGE_STRATEGY_UUID,
false, OVERWRITE_WITH_LATEST, overwritePayload,
- OVERWRITE_MERGE_STRATEGY_UUID),
- arguments(null, overwritePayload, null, false, OVERWRITE_WITH_LATEST,
overwritePayload, OVERWRITE_MERGE_STRATEGY_UUID),
- arguments(null, overwritePayload, OVERWRITE_MERGE_STRATEGY_UUID,
false, OVERWRITE_WITH_LATEST, overwritePayload,
- OVERWRITE_MERGE_STRATEGY_UUID),
- arguments(null, null, OVERWRITE_MERGE_STRATEGY_UUID, false,
OVERWRITE_WITH_LATEST, overwritePayload, OVERWRITE_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, null, null, false,
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, overwritePayload, null, false,
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING,
overwritePayload,
+ COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING,
overwritePayload,
+ COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, overwritePayload, null, false, COMMIT_TIME_ORDERING,
overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING,
overwritePayload,
+ COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false,
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
//test legal custom payload combos
arguments(CUSTOM, customPayload, null, false, CUSTOM, customPayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
@@ -346,25 +346,25 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
//test illegal combos
arguments(EVENT_TIME_ORDERING, overwritePayload, null, true, null,
null, null),
arguments(EVENT_TIME_ORDERING, customPayload, null, true, null, null,
null),
- arguments(EVENT_TIME_ORDERING, null, OVERWRITE_MERGE_STRATEGY_UUID,
true, null, null, null),
+ arguments(EVENT_TIME_ORDERING, null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
arguments(EVENT_TIME_ORDERING, null, customStrategy, true, null, null,
null),
arguments(EVENT_TIME_ORDERING, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
- arguments(OVERWRITE_WITH_LATEST, defaultPayload, null, true, null,
null, null),
- arguments(OVERWRITE_WITH_LATEST, customPayload, null, true, null,
null, null),
- arguments(OVERWRITE_WITH_LATEST, null, DEFAULT_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(OVERWRITE_WITH_LATEST, null, customStrategy, true, null,
null, null),
- arguments(OVERWRITE_WITH_LATEST, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
+ arguments(COMMIT_TIME_ORDERING, defaultPayload, null, true, null,
null, null),
+ arguments(COMMIT_TIME_ORDERING, customPayload, null, true, null, null,
null),
+ arguments(COMMIT_TIME_ORDERING, null, DEFAULT_MERGE_STRATEGY_UUID,
true, null, null, null),
+ arguments(COMMIT_TIME_ORDERING, null, customStrategy, true, null,
null, null),
+ arguments(COMMIT_TIME_ORDERING, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
arguments(CUSTOM, defaultPayload, null, true, null, null, null),
arguments(CUSTOM, overwritePayload, null, true, null, null, null),
arguments(CUSTOM, null, DEFAULT_MERGE_STRATEGY_UUID, true, null, null,
null),
- arguments(CUSTOM, null, OVERWRITE_MERGE_STRATEGY_UUID, true, null,
null, null),
+ arguments(CUSTOM, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true,
null, null, null),
arguments(CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
arguments(CUSTOM, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(CUSTOM, defaultPayload, OVERWRITE_MERGE_STRATEGY_UUID, true,
null, null, null),
+ arguments(CUSTOM, defaultPayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
arguments(CUSTOM, overwritePayload, DEFAULT_MERGE_STRATEGY_UUID, true,
null, null, null),
arguments(null, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
arguments(null, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(null, defaultPayload, OVERWRITE_MERGE_STRATEGY_UUID, true,
null, null, null),
+ arguments(null, defaultPayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
arguments(null, overwritePayload, DEFAULT_MERGE_STRATEGY_UUID, true,
null, null, null));
return arguments;
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
index 20e4df9a091..a39809ced14 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
@@ -59,8 +59,8 @@ public class TestOverwriteWithLatestMerger extends
HoodieFileGroupReaderTestHarn
@Override
protected Properties getMetaProps() {
Properties metaProps = super.getMetaProps();
- metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.OVERWRITE_WITH_LATEST.name());
- metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.OVERWRITE_MERGE_STRATEGY_UUID);
+ metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+ metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
metaProps.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
OverwriteWithLatestAvroPayload.class.getName());
return metaProps;
}
@@ -71,7 +71,7 @@ public class TestOverwriteWithLatestMerger extends
HoodieFileGroupReaderTestHarn
readerContext = new HoodieTestReaderContext(
Option.of(merger),
Option.of(OverwriteWithLatestAvroPayload.class.getName()));
- properties.setProperty("hoodie.write.record.merge.mode",
RecordMergeMode.OVERWRITE_WITH_LATEST.name());
+ properties.setProperty("hoodie.write.record.merge.mode",
RecordMergeMode.COMMIT_TIME_ORDERING.name());
// -------------------------------------------------------------
// The test logic is as follows:
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index f0d28fff00d..5a356a8d086 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -171,11 +171,11 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
@Override
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
// TODO(HUDI-7843):
- // get rid of event time and overwrite with latest. Just return
Option.empty
+ // get rid of event time and commit time ordering. Just return Option.empty
switch (mergeMode) {
case EVENT_TIME_ORDERING:
return Option.of(new DefaultHiveRecordMerger());
- case OVERWRITE_WITH_LATEST:
+ case COMMIT_TIME_ORDERING:
return Option.of(new OverwriteWithLatestHiveRecordMerger());
case CUSTOM:
default:
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/OverwriteWithLatestHiveRecordMerger.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/OverwriteWithLatestHiveRecordMerger.java
index f60f62a2b49..a19a18118f0 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/OverwriteWithLatestHiveRecordMerger.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/OverwriteWithLatestHiveRecordMerger.java
@@ -34,7 +34,7 @@ import java.io.IOException;
public class OverwriteWithLatestHiveRecordMerger extends
HoodieHiveRecordMerger {
@Override
public String getMergingStrategy() {
- return OVERWRITE_MERGE_STRATEGY_UUID;
+ return COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index b4b924e89ec..bed0d12d0e8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -78,7 +78,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
-
writeConfigs.put("hoodie.datasource.write.precombine.field",mergeMode.equals(RecordMergeMode.OVERWRITE_WITH_LATEST)
? "" : "timestamp");
+
writeConfigs.put("hoodie.datasource.write.precombine.field",mergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING)
? "" : "timestamp");
writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
@@ -180,7 +180,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
@Test
public void testProcessDeleteBlockWithPositions() throws Exception {
- prepareBuffer(RecordMergeMode.OVERWRITE_WITH_LATEST);
+ prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING);
HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
buffer.processDeleteBlock(deleteBlock);
assertEquals(50, buffer.getLogRecords().size());
@@ -199,7 +199,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
@Test
public void testProcessDeleteBlockWithoutPositions() throws Exception {
- prepareBuffer(RecordMergeMode.OVERWRITE_WITH_LATEST);
+ prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING);
HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions();
buffer.processDeleteBlock(deleteBlock);
assertEquals(50, buffer.getLogRecords().size());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index a0d902fcdc2..34fc7c8d72b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -51,7 +51,7 @@ class TestBasicSchemaEvolution extends
HoodieSparkClientTestBase with ScalaAsser
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
- HoodieWriteConfig.RECORD_MERGE_MODE.key() ->
RecordMergeMode.OVERWRITE_WITH_LATEST.name(),
+ HoodieWriteConfig.RECORD_MERGE_MODE.key() ->
RecordMergeMode.COMMIT_TIME_ORDERING.name(),
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index a99aa372d87..a725cab6023 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -270,7 +270,7 @@ public class HoodieStreamer implements Serializable {
public String sourceOrderingField = "ts";
@Parameter(names = {"--payload-class"}, description = "Deprecated. "
- + "Use --merge-mode for overwrite with latest or event time merging. "
+ + "Use --merge-mode for commit time or event time merging. "
+ "Subclass of HoodieRecordPayload, that works off a GenericRecord.
Implement your own, if you want to do something "
+ "other than overwriting existing value")
public String payloadClassName = null;