This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4bea55a6441 [HUDI-8413] Change the default merge mode to
COMMIT_TIME_ORDERING (#12600)
4bea55a6441 is described below
commit 4bea55a644154618b3a25dad2618b13a400dfcce
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Jan 28 08:19:24 2025 -0800
[HUDI-8413] Change the default merge mode to COMMIT_TIME_ORDERING (#12600)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +-
.../upgrade/EightToSevenDowngradeHandler.java | 4 +-
.../org/apache/hudi/DefaultSparkRecordMerger.java | 2 +-
.../apache/hudi/common/config/ConfigProperty.java | 1 -
.../hudi/common/model/HoodieRecordMerger.java | 24 ++-
.../hudi/common/model/HoodieRecordPayload.java | 6 +-
.../hudi/common/table/HoodieTableConfig.java | 184 ++++++++++---------
.../hudi/common/table/HoodieTableMetaClient.java | 3 +-
.../apache/hudi/configuration/FlinkOptions.java | 2 +-
.../apache/hudi/streamer/FlinkStreamerConfig.java | 2 +-
.../hudi/common/table/TestHoodieTableConfig.java | 201 ++++++++++++++-------
.../hudi/common/table/read/TestCustomMerger.java | 4 +-
.../common/table/read/TestEventTimeMerging.java | 4 -
.../table/read/TestOverwriteWithLatestMerger.java | 2 -
.../hudi/hadoop/DefaultHiveRecordMerger.java | 2 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 3 -
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 39 ++--
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 6 +-
.../functional/TestHoodieBackedMetadata.java | 10 +-
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 8 +-
.../sql/hudi/common/TestHoodieOptionConfig.scala | 4 +-
.../apache/spark/sql/hudi/common/TestSqlConf.scala | 4 +-
.../MaxwellJsonKafkaSourcePostProcessor.java | 3 +-
.../hudi/utilities/streamer/BootstrapExecutor.java | 3 +-
.../hudi/utilities/streamer/HoodieStreamer.java | 7 +-
.../utilities/streamer/HoodieStreamerUtils.java | 4 +-
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 3 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 4 +-
28 files changed, 329 insertions(+), 214 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index a3f903f1c86..99d46ad08e4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -166,7 +166,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME =
ConfigProperty
.key("hoodie.datasource.write.precombine.field")
- .defaultValue("ts")
+ .noDefaultValue()
.withDocumentation("Field used in preCombining before actual write. When
two records have the same key value, "
+ "we will pick the one with the largest value for the precombine
field, determined by Object.compareTo(..)");
@@ -181,7 +181,7 @@ public class HoodieWriteConfig extends HoodieConfig {
// This ConfigProperty is also used in SQL options which expect String type
public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
.key("hoodie.write.record.merge.mode")
- .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
+ .noDefaultValue("COMMIT_TIME_ORDERING if ordering field is not set;
EVENT_TIME_ORDERING if ordering field is set")
.sinceVersion("1.0.0")
.withDocumentation(RecordMergeMode.class);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 6193067eb84..4980fcec4f7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -166,7 +166,9 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
static void unsetRecordMergeMode(HoodieTableConfig tableConfig,
Map<ConfigProperty, String> tablePropsToAdd) {
Triple<RecordMergeMode, String, String> mergingConfigs =
-
HoodieTableConfig.inferCorrectMergingBehavior(tableConfig.getRecordMergeMode(),
tableConfig.getPayloadClass(), tableConfig.getRecordMergeStrategyId());
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(),
+ tableConfig.getRecordMergeStrategyId(),
tableConfig.getPreCombineField());
if (StringUtils.nonEmpty(mergingConfigs.getMiddle())) {
tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME,
mergingConfigs.getMiddle());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
index 33bd74f2bf6..652b016cf50 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
@@ -40,7 +40,7 @@ public class DefaultSparkRecordMerger extends
HoodieSparkRecordMerger {
@Override
public String getMergingStrategy() {
- return HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
+ return HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
index aa2cf642309..af8fae69d41 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
@@ -254,7 +254,6 @@ public class ConfigProperty<T> implements Serializable {
}
public <T> ConfigProperty<T> defaultValue(T value, String
docOnDefaultValue) {
- Objects.requireNonNull(value);
Objects.requireNonNull(docOnDefaultValue);
ConfigProperty<T> configProperty = new ConfigProperty<>(key, value,
docOnDefaultValue, "", Option.empty(), Option.empty(), Option.empty(),
Collections.emptySet(), false);
return configProperty;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 9628e9ced24..f6d1cb1db3a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.model;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+
/**
* HoodieMerge defines how to merge two records. It is a stateless component.
* It can implement the merging logic of HoodieRecord of different engines
@@ -44,7 +47,7 @@ import java.util.List;
public interface HoodieRecordMerger extends Serializable {
// Uses event time ordering to determine which record is chosen
- String DEFAULT_MERGE_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
+ String EVENT_TIME_BASED_MERGE_STRATEGY_UUID =
"eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
// Always chooses the most recently written record
String COMMIT_TIME_BASED_MERGE_STRATEGY_UUID =
"ce9acb64-bde0-424c-9b91-f6ebba25356d";
@@ -183,4 +186,23 @@ public interface HoodieRecordMerger extends Serializable {
*/
String getMergingStrategy();
+ static String getRecordMergeStrategyId(RecordMergeMode mergeMode,
+ String payloadClassName,
+ String recordMergeStrategyId) {
+ switch (mergeMode) {
+ case COMMIT_TIME_ORDERING:
+ return COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+ case EVENT_TIME_ORDERING:
+ return EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+ case CUSTOM:
+ default:
+ if (nonEmpty(recordMergeStrategyId)) {
+ return recordMergeStrategyId;
+ }
+ if (nonEmpty(payloadClassName)) {
+ return PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+ }
+ return null;
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 39ff6ceb369..543562da375 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -146,14 +146,16 @@ public interface HoodieRecordPayload<T extends
HoodieRecordPayload> extends Seri
return 0;
}
- static String getAvroPayloadForMergeMode(RecordMergeMode mergeMode) {
+ static String getAvroPayloadForMergeMode(RecordMergeMode mergeMode, String
payloadClassName) {
switch (mergeMode) {
//TODO: After we have merge mode working for writing, we should have a
dummy payload that will throw exception when used
- default:
case EVENT_TIME_ORDERING:
return DefaultHoodieRecordPayload.class.getName();
case COMMIT_TIME_ORDERING:
return OverwriteWithLatestAvroPayload.class.getName();
+ case CUSTOM:
+ default:
+ return payloadClassName;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index b225bacce0c..1b85a1707ae 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.BootstrapIndexType;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
@@ -76,9 +77,9 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM;
import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
-import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.INPUT_TIME_UNIT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_INPUT_DATE_FORMAT;
@@ -88,13 +89,14 @@ import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAM
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT;
import static
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
-import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.util.ConfigUtils.fetchConfigs;
import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@Immutable
@@ -198,7 +200,8 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<RecordMergeMode> RECORD_MERGE_MODE =
ConfigProperty
.key("hoodie.record.merge.mode")
- .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING)
+ .defaultValue((RecordMergeMode) null,
+ "COMMIT_TIME_ORDERING if precombine is not set; EVENT_TIME_ORDERING
if precombine is set")
.sinceVersion("1.0.0")
.withDocumentation(RecordMergeMode.class);
@@ -736,7 +739,7 @@ public class HoodieTableConfig extends HoodieConfig {
}
public RecordMergeMode getRecordMergeMode() {
- return RecordMergeMode.getValue(getStringOrDefault(RECORD_MERGE_MODE));
+ return RecordMergeMode.getValue(getString(RECORD_MERGE_MODE));
}
/**
@@ -754,94 +757,113 @@ public class HoodieTableConfig extends HoodieConfig {
* Infers the merging behavior based on what the user sets (or doesn't set).
* Validates that the user has not set an illegal combination of configs
*/
- public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode, String
payloadClassName,
-
String recordMergeStrategyId) {
+ public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode,
+
String payloadClassName,
+
String recordMergeStrategyId,
+
String orderingFieldName) {
RecordMergeMode inferredRecordMergeMode;
String inferredPayloadClassName;
String inferredRecordMergeStrategyId;
- if (isNullOrEmpty(payloadClassName)) {
- if (isNullOrEmpty(recordMergeStrategyId)) {
- // no payload class name or merge strategy ID. If nothing is set then
we default. User cannot set custom because no payload or strategy is set
- checkArgument(recordMergeMode != RecordMergeMode.CUSTOM,
- "Custom merge mode should only be used if you set a payload class
or merge strategy ID");
- if (recordMergeMode == null) {
- inferredRecordMergeMode = RECORD_MERGE_MODE.defaultValue();
- } else {
- inferredRecordMergeMode = recordMergeMode;
- }
+ // Inferring record merge mode
+ if (isNullOrEmpty(payloadClassName) &&
isNullOrEmpty(recordMergeStrategyId)) {
+ // If nothing is set on record merge mode, payload class, or record
merge strategy ID,
+ // use the default merge mode determined by whether the ordering field
name is set.
+ inferredRecordMergeMode = recordMergeMode != null
+ ? recordMergeMode
+ : (isNullOrEmpty(orderingFieldName) ? COMMIT_TIME_ORDERING :
EVENT_TIME_ORDERING);
+ } else {
+ // Infer the merge mode from either the payload class or record merge
strategy ID
+ RecordMergeMode modeBasedOnPayload =
inferRecordMergeModeFromPayloadClass(payloadClassName);
+ RecordMergeMode modeBasedOnStrategyId =
inferRecordMergeModeFromMergeStrategyId(recordMergeStrategyId);
+ checkArgument(modeBasedOnPayload != null || modeBasedOnStrategyId !=
null,
+ String.format("Cannot infer record merge mode from payload class
(%s) or record merge "
+ + "strategy ID (%s).", payloadClassName, recordMergeStrategyId));
+ // TODO(HUDI-8925): once payload class name is not required, remove the
check on
+ // modeBasedOnStrategyId
+ if (modeBasedOnStrategyId != CUSTOM && modeBasedOnPayload != null &&
modeBasedOnStrategyId != null) {
+ checkArgument(modeBasedOnPayload.equals(modeBasedOnStrategyId),
+ String.format("Configured payload class (%s) and record merge
strategy ID (%s) conflict "
+ + "with each other. Please only set one of them in the
write config.",
+ payloadClassName, recordMergeStrategyId));
+ }
+ inferredRecordMergeMode = modeBasedOnStrategyId != null ?
modeBasedOnStrategyId : modeBasedOnPayload;
+ }
+ if (recordMergeMode != null) {
+ checkArgument(inferredRecordMergeMode == recordMergeMode,
+ String.format("Configured record merge mode (%s) is inconsistent
with payload class (%s) "
+ + "or record merge strategy ID (%s) configured. Please
revisit the configs.",
+ recordMergeMode, payloadClassName, recordMergeStrategyId));
+ }
- // set merger strategy based on merge mode
- if (inferredRecordMergeMode == COMMIT_TIME_ORDERING) {
- inferredRecordMergeStrategyId =
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
- } else if (inferredRecordMergeMode == EVENT_TIME_ORDERING) {
- inferredRecordMergeStrategyId = DEFAULT_MERGE_STRATEGY_UUID;
- } else {
- throw new IllegalStateException("Merge Mode: '" +
inferredRecordMergeMode + "' has not been fully implemented.");
- }
- } else {
- // no payload class but merge strategy ID is set. Need to validate
that strategy and merge mode align if both are set
- inferredRecordMergeStrategyId = recordMergeStrategyId;
- if (recordMergeStrategyId.equals(DEFAULT_MERGE_STRATEGY_UUID)) {
- checkArgument(recordMergeMode == null || recordMergeMode ==
EVENT_TIME_ORDERING,
- "Default merge strategy ID can only be used with the merge mode
of EVENT_TIME_ORDERING");
- inferredRecordMergeMode = EVENT_TIME_ORDERING;
- } else if
(recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)) {
- checkArgument(recordMergeMode == null || recordMergeMode ==
COMMIT_TIME_ORDERING,
- "Commit time ordering merger strategy ID can only be used with
the merge mode of COMMIT_TIME_ORDERING");
- inferredRecordMergeMode = COMMIT_TIME_ORDERING;
- } else {
-
checkArgument(!recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
- "Payload based strategy should only be used if you have a custom
payload class set");
- checkArgument(recordMergeMode == null || recordMergeMode == CUSTOM,
- "Record merge mode must be set to custom when using a custom
merge strategy ID");
- inferredRecordMergeMode = CUSTOM;
- }
+ // Check ordering field name based on record merge mode
+ if (inferredRecordMergeMode == COMMIT_TIME_ORDERING) {
+ if (nonEmpty(orderingFieldName)) {
+ LOG.warn("The precombine or ordering field ({}) is specified.
COMMIT_TIME_ORDERING "
+ + "merge mode does not use precombine or ordering field anymore.",
orderingFieldName);
}
- inferredPayloadClassName =
HoodieRecordPayload.getAvroPayloadForMergeMode(inferredRecordMergeMode);
- } else {
- // payload class name is set
- inferredPayloadClassName = payloadClassName;
- if (payloadClassName.equals(DefaultHoodieRecordPayload.class.getName()))
{
- // Default payload matches with EVENT_TIME_ORDERING. However, Custom
merge modes still have some gaps (tracked by [HUDI-8317]) so
- // we will use default merger for now on the write path if the user
has a custom merger. After all gaps have been closed, we will set
- // a dummy payload by default for custom merge mode. Then we can get
rid of this if else, and add the validation:
- // checkArgument(isNullOrEmpty(recordMergeStrategyId) ||
recordMergeStrategyId.equals(DEFAULT_MERGER_STRATEGY_UUID),
- // "Record merge strategy cannot be set if a merge payload is used");
- if (isNullOrEmpty(recordMergeStrategyId) ||
recordMergeStrategyId.equals(DEFAULT_MERGE_STRATEGY_UUID)) {
- // Default case, everything should be null or event time ordering /
default strategy
- checkArgument(recordMergeMode == null || recordMergeMode ==
EVENT_TIME_ORDERING,
- "Only the record merge mode of EVENT_TIME_ORDERING can be used
with default payload");
- inferredRecordMergeMode = EVENT_TIME_ORDERING;
- inferredRecordMergeStrategyId = DEFAULT_MERGE_STRATEGY_UUID;
- } else {
- // currently for the custom case. This block will be moved below and
check if the payload class name is dummy
- checkArgument(recordMergeMode == null || recordMergeMode == CUSTOM,
"Record merge mode, payload class, and merge strategy are in an illegal
configuration");
- checkArgument(
-
!recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID) &&
!recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
- "Record merger strategy is incompatible with payload class");
- inferredRecordMergeMode = CUSTOM;
- inferredRecordMergeStrategyId = recordMergeStrategyId;
- }
- } else if
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
- // strategy and merge mode must be unset or align with overwrite
- checkArgument(isNullOrEmpty(recordMergeStrategyId) ||
recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
- "Record merge strategy cannot be set if a merge payload is used");
- checkArgument(recordMergeMode == null || recordMergeMode ==
COMMIT_TIME_ORDERING, "Only commit time ordering merge mode can be used with
overwrite payload");
- inferredRecordMergeMode = COMMIT_TIME_ORDERING;
- inferredRecordMergeStrategyId = COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
- } else {
- // using custom avro payload
- checkArgument(isNullOrEmpty(recordMergeStrategyId) ||
recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
- "Record merge strategy cannot be set if a merge payload is used");
- checkArgument(recordMergeMode == null || recordMergeMode == CUSTOM,
"Record merge mode must be custom if payload is defined");
- inferredRecordMergeMode = CUSTOM;
- inferredRecordMergeStrategyId = PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+ } else if (inferredRecordMergeMode == EVENT_TIME_ORDERING) {
+ if (isNullOrEmpty(orderingFieldName)) {
+ LOG.warn("The precombine or ordering field is not specified.
EVENT_TIME_ORDERING "
+ + "merge mode requires precombine or ordering field to be set for
getting the "
+ + "event time. Using commit time-based ordering now.");
}
}
+
+ // Inferring payload class name
+ inferredPayloadClassName = HoodieRecordPayload.getAvroPayloadForMergeMode(
+ inferredRecordMergeMode, payloadClassName);
+ // Inferring record merge strategy ID
+ inferredRecordMergeStrategyId =
HoodieRecordMerger.getRecordMergeStrategyId(
+ inferredRecordMergeMode, inferredPayloadClassName,
recordMergeStrategyId);
+
+ // For custom merge mode, either payload class name or record merge
strategy ID must be configured
+ if (inferredRecordMergeMode == CUSTOM) {
+ checkArgument(nonEmpty(inferredPayloadClassName) ||
nonEmpty(inferredRecordMergeStrategyId),
+ "Either payload class name or record merge strategy ID must be
configured "
+ + "in CUSTOM merge mode.");
+ if
(PAYLOAD_BASED_MERGE_STRATEGY_UUID.equals(inferredRecordMergeStrategyId)) {
+ checkArgument(nonEmpty(inferredPayloadClassName),
+ "For payload class based merge strategy as a fallback, payload
class name is "
+ + "required to be set.");
+ }
+ // TODO(HUDI-8925): remove this once the payload class name is no longer
required
+ if (isNullOrEmpty(inferredPayloadClassName)) {
+ inferredPayloadClassName = DEFAULT_PAYLOAD_CLASS_NAME;
+ }
+ }
+
return Triple.of(inferredRecordMergeMode, inferredPayloadClassName,
inferredRecordMergeStrategyId);
}
+ static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName) {
+ if (isNullOrEmpty(payloadClassName)) {
+ return null;
+ }
+ if (DefaultHoodieRecordPayload.class.getName().equals(payloadClassName)) {
+ // DefaultHoodieRecordPayload matches with EVENT_TIME_ORDERING.
+ return EVENT_TIME_ORDERING;
+ } else if
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
+ // OverwriteWithLatestAvroPayload matches with COMMIT_TIME_ORDERING.
+ return COMMIT_TIME_ORDERING;
+ } else {
+ return CUSTOM;
+ }
+ }
+
+ static RecordMergeMode inferRecordMergeModeFromMergeStrategyId(String
recordMergeStrategyId) {
+ if (isNullOrEmpty(recordMergeStrategyId)) {
+ return null;
+ }
+ if (recordMergeStrategyId.equals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID)) {
+ return EVENT_TIME_ORDERING;
+ } else if
(recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)) {
+ return COMMIT_TIME_ORDERING;
+ } else {
+ return CUSTOM;
+ }
+ }
+
public String getPreCombineField() {
return getString(PRECOMBINE_FIELD);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 199fc770096..25ad815f600 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -1393,7 +1393,8 @@ public class HoodieTableMetaClient implements
Serializable {
}
Triple<RecordMergeMode, String, String> mergeConfigs =
- HoodieTableConfig.inferCorrectMergingBehavior(recordMergeMode,
payloadClassName, recordMergerStrategyId);
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ recordMergeMode, payloadClassName, recordMergerStrategyId,
preCombineField);
tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), mergeConfigs.getMiddle());
tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, mergeConfigs.getRight());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 36af35435fe..d65c80f89e9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -142,7 +142,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> RECORD_MERGER_STRATEGY_ID =
ConfigOptions
.key("record.merger.strategy")
.stringType()
- .defaultValue(HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID)
+ .defaultValue(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
.withFallbackKeys(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key())
.withDescription("Id of merger strategy. Hudi will pick
HoodieRecordMerger implementations in record.merger.impls which has the same
merger strategy id");
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 66f81221c34..a58140635c9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -132,7 +132,7 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--record-merger-strategy"}, description = "Id of record
merger strategy. Hudi will pick HoodieRecordMerger implementations in
record-merger-impls "
+ "which has the same record merger strategy id")
- public String recordMergerStrategy =
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
+ public String recordMergerStrategy =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
@Parameter(names = {"--op"}, description = "Takes one of these values :
UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed).", converter =
OperationConverter.class)
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 5e23391e0ff..3dacf596258 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -54,11 +54,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM;
import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
-import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
-import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
@@ -270,7 +270,7 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
HoodieConfig config = new HoodieConfig();
config.setValue(HoodieTableConfig.VERSION,
String.valueOf(HoodieTableVersion.SIX.versionCode()));
config.setValue(HoodieTableConfig.INITIAL_VERSION,
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
- config.setValue(RECORD_MERGE_MODE,
RECORD_MERGE_MODE.defaultValue().name());
+ config.setValue(RECORD_MERGE_MODE, COMMIT_TIME_ORDERING.name());
HoodieTableConfig.dropInvalidConfigs(config);
assertTrue(config.contains(HoodieTableConfig.VERSION));
@@ -280,7 +280,7 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
// test valid ones are not dropped
config = new HoodieConfig();
config.setValue(HoodieTableConfig.VERSION,
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
- config.setValue(RECORD_MERGE_MODE,
RECORD_MERGE_MODE.defaultValue().name());
+ config.setValue(RECORD_MERGE_MODE, COMMIT_TIME_ORDERING.name());
HoodieTableConfig.dropInvalidConfigs(config);
assertTrue(config.contains(RECORD_MERGE_MODE));
}
@@ -300,84 +300,155 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
String overwritePayload = OverwriteWithLatestAvroPayload.class.getName();
String customPayload = "custom_payload";
String customStrategy = "custom_strategy";
+ String orderingFieldName = "timestamp";
Stream<Arguments> arguments = Stream.of(
//test empty args with both null and ""
- arguments(null, null, null, false, EVENT_TIME_ORDERING,
defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
- arguments(null, "", "", false, EVENT_TIME_ORDERING, defaultPayload,
DEFAULT_MERGE_STRATEGY_UUID),
+ arguments(null, null, null, null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, null, null, "",
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, null, null, orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, "", "", null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, "", "", orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
//test legal event time ordering combos
- arguments(EVENT_TIME_ORDERING, null, null, false, EVENT_TIME_ORDERING,
defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
- arguments(EVENT_TIME_ORDERING, defaultPayload, null, false,
EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
- arguments(EVENT_TIME_ORDERING, defaultPayload,
DEFAULT_MERGE_STRATEGY_UUID, false, EVENT_TIME_ORDERING, defaultPayload,
- DEFAULT_MERGE_STRATEGY_UUID),
- arguments(EVENT_TIME_ORDERING, null, DEFAULT_MERGE_STRATEGY_UUID,
false, EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
- arguments(null, defaultPayload, null, false, EVENT_TIME_ORDERING,
defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
- arguments(null, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID, false,
EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
- arguments(null, null, DEFAULT_MERGE_STRATEGY_UUID, false,
EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
-
- //test legal overwrite combos
- arguments(COMMIT_TIME_ORDERING, null, null, false,
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
- arguments(COMMIT_TIME_ORDERING, overwritePayload, null, false,
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
- arguments(COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING,
overwritePayload,
- COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
- arguments(COMMIT_TIME_ORDERING, null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING,
overwritePayload,
- COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
- arguments(null, overwritePayload, null, false, COMMIT_TIME_ORDERING,
overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
- arguments(null, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING,
overwritePayload,
- COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
- arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false,
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-
- //test legal custom payload combos
- arguments(CUSTOM, customPayload, null, false, CUSTOM, customPayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
- arguments(CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
- arguments(null, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
- arguments(null, customPayload, null, false, CUSTOM, customPayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
-
- //test legal custom merger combos
- arguments(CUSTOM, null, customStrategy, false, CUSTOM, defaultPayload,
customStrategy),
- //for now this case is ok but will need to be changed when we add
dummy payload for [HUDI-8317]
- arguments(CUSTOM, defaultPayload, customStrategy, false, CUSTOM,
defaultPayload, customStrategy),
+ arguments(EVENT_TIME_ORDERING, null, null, null,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(EVENT_TIME_ORDERING, null, null, orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(EVENT_TIME_ORDERING, defaultPayload, null, orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(EVENT_TIME_ORDERING, null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, defaultPayload, null, null,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, defaultPayload, null, orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, defaultPayload, EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
orderingFieldName,
+ false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+
+ //test legal commit time ordering combos
+ arguments(COMMIT_TIME_ORDERING, null, null, null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, null, null, "",
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, null, null, orderingFieldName,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, overwritePayload, null, null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(COMMIT_TIME_ORDERING, null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, overwritePayload, null, null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, overwritePayload, null, "",
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, overwritePayload, null, orderingFieldName,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
orderingFieldName,
+ false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+
+ //test legal custom merge mode combos
+ arguments(CUSTOM, customPayload, null, null,
+ false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ arguments(CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
+ false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
+ false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, customPayload, null, null,
+ false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ arguments(CUSTOM, null, customStrategy, null,
+ false, CUSTOM, defaultPayload, customStrategy),
+ arguments(CUSTOM, customPayload, customStrategy, null,
+ false, CUSTOM, customPayload, customStrategy),
+
+ //test legal configs that work but should not be used usually
+ arguments(CUSTOM, defaultPayload, customStrategy, null,
+ false, CUSTOM, defaultPayload, customStrategy),
+ arguments(CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
+ false, CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ arguments(CUSTOM, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
+ false, CUSTOM, overwritePayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
+ false, CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ arguments(null, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
+ false, CUSTOM, overwritePayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
//test illegal combos due to missing info
- arguments(CUSTOM, null, null, true, null, null, null),
- arguments(CUSTOM, null, PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null,
null, null),
+ arguments(CUSTOM, null, null, null,
+ true, null, null, null),
+ arguments(CUSTOM, null, PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
+ true, null, null, null),
//test illegal combos
- arguments(EVENT_TIME_ORDERING, overwritePayload, null, true, null,
null, null),
- arguments(EVENT_TIME_ORDERING, customPayload, null, true, null, null,
null),
- arguments(EVENT_TIME_ORDERING, null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
- arguments(EVENT_TIME_ORDERING, null, customStrategy, true, null, null,
null),
- arguments(EVENT_TIME_ORDERING, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
- arguments(COMMIT_TIME_ORDERING, defaultPayload, null, true, null,
null, null),
- arguments(COMMIT_TIME_ORDERING, customPayload, null, true, null, null,
null),
- arguments(COMMIT_TIME_ORDERING, null, DEFAULT_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(COMMIT_TIME_ORDERING, null, customStrategy, true, null,
null, null),
- arguments(COMMIT_TIME_ORDERING, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
- arguments(CUSTOM, defaultPayload, null, true, null, null, null),
- arguments(CUSTOM, overwritePayload, null, true, null, null, null),
- arguments(CUSTOM, null, DEFAULT_MERGE_STRATEGY_UUID, true, null, null,
null),
- arguments(CUSTOM, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true,
null, null, null),
- arguments(CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(CUSTOM, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(CUSTOM, defaultPayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
- arguments(CUSTOM, overwritePayload, DEFAULT_MERGE_STRATEGY_UUID, true,
null, null, null),
- arguments(null, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(null, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(null, defaultPayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
true, null, null, null),
- arguments(null, overwritePayload, DEFAULT_MERGE_STRATEGY_UUID, true,
null, null, null));
+ arguments(EVENT_TIME_ORDERING, overwritePayload, null,
orderingFieldName,
+ true, null, null, null),
+ arguments(EVENT_TIME_ORDERING, customPayload, null, orderingFieldName,
+ true, null, null, null),
+ arguments(EVENT_TIME_ORDERING, null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
+ true, null, null, null),
+ arguments(EVENT_TIME_ORDERING, null, customStrategy, orderingFieldName,
+ true, null, null, null),
+ arguments(EVENT_TIME_ORDERING, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
+ true, null, null, null),
+ arguments(COMMIT_TIME_ORDERING, defaultPayload, null, null,
+ true, null, null, null),
+ arguments(COMMIT_TIME_ORDERING, customPayload, null, null,
+ true, null, null, null),
+ arguments(COMMIT_TIME_ORDERING, null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ true, null, null, null),
+ arguments(COMMIT_TIME_ORDERING, null, customStrategy, null,
+ true, null, null, null),
+ arguments(COMMIT_TIME_ORDERING, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
+ true, null, null, null),
+ arguments(CUSTOM, defaultPayload, null, null,
+ true, null, null, null),
+ arguments(CUSTOM, overwritePayload, null, null,
+ true, null, null, null),
+ arguments(CUSTOM, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ true, null, null, null),
+ arguments(CUSTOM, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ true, null, null, null),
+ arguments(CUSTOM, defaultPayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ true, null, null, null),
+ arguments(CUSTOM, overwritePayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ true, null, null, null),
+ arguments(null, defaultPayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
null,
+ true, null, null, null),
+ arguments(null, overwritePayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+ true, null, null, null));
return arguments;
}
@ParameterizedTest
@MethodSource("argumentsForInferringRecordMergeMode")
- public void testInferMergeMode(RecordMergeMode inputMergeMode, String
inputPayloadClass, String inputMergeStrategy, boolean shouldThrow,
- RecordMergeMode outputMergeMode, String
outputPayloadClass, String outputMergeStrategy) {
+ public void testInferMergeMode(RecordMergeMode inputMergeMode, String
inputPayloadClass,
+ String inputMergeStrategy, String
orderingFieldName,
+ boolean shouldThrow, RecordMergeMode
outputMergeMode,
+ String outputPayloadClass, String
outputMergeStrategy) {
if (shouldThrow) {
assertThrows(IllegalArgumentException.class,
- () -> HoodieTableConfig.inferCorrectMergingBehavior(inputMergeMode,
inputPayloadClass, inputMergeStrategy));
+ () -> HoodieTableConfig.inferCorrectMergingBehavior(
+ inputMergeMode, inputPayloadClass, inputMergeStrategy,
orderingFieldName));
} else {
- Triple<RecordMergeMode, String, String> inferredConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(inputMergeMode,
inputPayloadClass, inputMergeStrategy);
+ Triple<RecordMergeMode, String, String> inferredConfigs =
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ inputMergeMode, inputPayloadClass, inputMergeStrategy,
orderingFieldName);
assertEquals(outputMergeMode, inferredConfigs.getLeft());
assertEquals(outputPayloadClass, inferredConfigs.getMiddle());
assertEquals(outputMergeStrategy, inferredConfigs.getRight());
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
index 43cd4f7c4f0..31d0700935b 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -64,10 +63,9 @@ public class TestCustomMerger extends
HoodieFileGroupReaderTestHarness {
@Override
protected Properties getMetaProps() {
- Properties metaProps = super.getMetaProps();
+ Properties metaProps = super.getMetaProps();
metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.CUSTOM.name());
metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
CustomAvroMerger.KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY);
- metaProps.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
DefaultHoodieRecordPayload.class.getName());
return metaProps;
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
index 222c3e2d1bd..178b6a43113 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
@@ -20,9 +20,7 @@
package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
@@ -62,8 +60,6 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
protected Properties getMetaProps() {
Properties metaProps = super.getMetaProps();
metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name());
- metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID);
- metaProps.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
DefaultHoodieRecordPayload.class.getName());
return metaProps;
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
index a39809ced14..4453c439535 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
@@ -60,8 +60,6 @@ public class TestOverwriteWithLatestMerger extends
HoodieFileGroupReaderTestHarn
protected Properties getMetaProps() {
Properties metaProps = super.getMetaProps();
metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.COMMIT_TIME_ORDERING.name());
- metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
- metaProps.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
OverwriteWithLatestAvroPayload.class.getName());
return metaProps;
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
index bc7a0871917..dc03c755ee7 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
@@ -64,6 +64,6 @@ public class DefaultHiveRecordMerger extends
HoodieHiveRecordMerger {
@Override
public String getMergingStrategy() {
- return HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
+ return HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 39a8f815768..4ad8cbb25fb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -857,9 +857,6 @@ object DataSourceWriteOptions {
/** @deprecated Use {@link PRECOMBINE_FIELD} and its methods instead */
@Deprecated
val PRECOMBINE_FIELD_OPT_KEY = HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key()
- /** @deprecated Use {@link PRECOMBINE_FIELD} and its methods instead */
- @Deprecated
- val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = PRECOMBINE_FIELD.defaultValue()
/** @deprecated Use {@link HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME} and
its methods instead */
@Deprecated
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c6a9559b2c8..4a64319752f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -17,12 +17,6 @@
package org.apache.hudi
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.shims.ShimLoader
import
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType,
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
@@ -34,33 +28,34 @@ import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema
import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor,
DatasetBulkInsertOverwriteCommitActionExecutor,
DatasetBulkInsertOverwriteTableCommitActionExecutor}
import org.apache.hudi.common.config._
import org.apache.hudi.common.engine.HoodieEngineContext
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model._
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
-import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option =>
HOption}
+import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
-import
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
+import
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY,
WRITE_TABLE_VERSION}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.SerDeHelper
+import org.apache.hudi.keygen.{BaseKeyGenerator,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.keygen.{BaseKeyGenerator,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.storage.HoodieStorage
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -68,18 +63,26 @@ import org.apache.hudi.sync.common.util.SyncUtilHelpers
import
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
import org.apache.hudi.util.SparkKeyGenUtils
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import
org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
import org.apache.spark.sql._
+import
org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
-import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.slf4j.LoggerFactory
import java.util.function.BiConsumer
+
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
@@ -1127,10 +1130,10 @@ class HoodieSparkSqlWriterInternal {
||
!mergedParams.contains(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key())
||
!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key())) {
val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
-
RecordMergeMode.getValue(mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
- DataSourceWriteOptions.RECORD_MERGE_MODE.defaultValue())),
+
RecordMergeMode.getValue(mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
null)),
mergedParams.getOrElse(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), ""),
-
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(),
""))
+
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(),
""),
+ optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
inferredMergeConfigs.getLeft.name())
mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(),
inferredMergeConfigs.getLeft.name())
mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(),
inferredMergeConfigs.getMiddle)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index a6008cbf523..7de0617b23b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -24,9 +24,9 @@ import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, RecordMergeMode, TypedProperties}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieRecordMerger, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig,
HoodieWriteConfig}
-import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder,
MultiPartKeysValueExtractor}
import org.apache.hudi.hive.ddl.HiveSyncMode
@@ -259,10 +259,10 @@ trait ProvidesHoodieConfig extends Logging {
val deducedPayloadClassName =
classOf[DefaultHoodieRecordPayload].getCanonicalName
val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name
- val recordMergeStrategy = HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID
+ val recordMergeStrategy =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
if
(tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName)
&&
-
tableConfig.getRecordMergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING)) {
+
RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) {
tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 240bf1c8a36..7ba05a9948c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -409,8 +409,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertTrue(metadataWriter.isPresent());
Triple<RecordMergeMode, String, String> inferredMergeConfs =
-
HoodieTableConfig.inferCorrectMergingBehavior(writeConfig.getRecordMergeMode(),
writeConfig.getPayloadClass(),
- writeConfig.getRecordMergeStrategyId());
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ writeConfig.getRecordMergeMode(), writeConfig.getPayloadClass(),
+ writeConfig.getRecordMergeStrategyId(),
writeConfig.getPreCombineField());
HoodieTableConfig hoodieTableConfig =
new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
inferredMergeConfs.getLeft(), inferredMergeConfs.getMiddle(),
inferredMergeConfs.getRight());
assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
@@ -430,8 +431,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertFalse(metadataWriter2.isPresent());
Triple<RecordMergeMode, String, String> inferredMergeConfs2 =
-
HoodieTableConfig.inferCorrectMergingBehavior(writeConfig2.getRecordMergeMode(),
writeConfig2.getPayloadClass(),
- writeConfig2.getRecordMergeStrategyId());
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ writeConfig2.getRecordMergeMode(), writeConfig2.getPayloadClass(),
+ writeConfig2.getRecordMergeStrategyId(),
writeConfig2.getPreCombineField());
HoodieTableConfig hoodieTableConfig2 =
new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
inferredMergeConfs2.getLeft(),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index b020598aa24..c4db65a1bba 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -30,10 +30,11 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.{ComplexKeyGenerator,
NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.avro.Schema
-import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions.{DROP_INSERT_DUP_POLICY,
FAIL_INSERT_DUP_POLICY, INSERT_DROP_DUPS, INSERT_DUP_POLICY}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig}
+
+import org.apache.avro.Schema
+import org.apache.commons.io.FileUtils
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{expr, lit}
@@ -49,6 +50,7 @@ import org.scalatest.Matchers.{be, convertToAnyShouldWrapper,
intercept}
import java.time.Instant
import java.util.{Collections, Date, UUID}
+
import scala.collection.JavaConverters._
/**
@@ -612,7 +614,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
.setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
.setArchiveLogFolder(HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue())
-
.setPreCombineField(fooTableParams.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key,
DataSourceWriteOptions.PRECOMBINE_FIELD.defaultValue()))
+
.setPreCombineField(fooTableParams.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key,
null))
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
.setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue()))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
index 1f1e99b14ad..c11f5cc5140 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hudi.common
import org.apache.hudi.common.model.{HoodieRecordMerger,
OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.hudi.HoodieOptionConfig
@@ -42,8 +41,7 @@ class TestHoodieOptionConfig extends
SparkClientFunctionalTestHarness {
"preCombineField" -> "timestamp",
"type" -> "mor",
"payloadClass" -> classOf[OverwriteWithLatestAvroPayload].getName,
- "recordMergeStrategyId" ->
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID,
- "recordMergeMode" -> HoodieWriteConfig.RECORD_MERGE_MODE.defaultValue
+ "recordMergeStrategyId" ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
)
val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
assertTrue(ops2 == with2)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
index 620746abe4d..e02a9b30c63 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi.common
import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.common.config.DFSPropertiesConfiguration
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
RecordMergeMode}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestUtils
@@ -82,7 +82,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with
BeforeAndAfter {
assertResult(true)(Files.exists(Paths.get(s"$tablePath/$partitionVal")))
assertResult(HoodieTableType.MERGE_ON_READ)(new HoodieTableConfig(
HoodieStorageUtils.getStorage(tablePath,
HoodieTestUtils.getDefaultStorageConf),
- new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME),
HoodieTableConfig.RECORD_MERGE_MODE.defaultValue(), null, null).getTableType)
+ new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME),
RecordMergeMode.COMMIT_TIME_ORDERING, null, null).getTableType)
// Manually pass incremental configs to global configs to make sure Hudi
query is able to load the
// global configs
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java
index c4699db3bed..50fd71c949f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java
@@ -124,8 +124,7 @@ public class MaxwellJsonKafkaSourcePostProcessor extends
JsonKafkaSourcePostProc
// we can update the `update_time`(delete time) only when it is in
timestamp format.
if (!preCombineFieldType.equals(NON_TIMESTAMP)) {
- String preCombineField =
this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(),
- HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue());
+ String preCombineField =
this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), null);
// ts from maxwell
long ts = inputJson.get(TS).longValue();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
index cc261545554..2493451ec1a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
@@ -207,8 +207,7 @@ public class BootstrapExecutor implements Serializable {
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setRecordKeyFields(props.getString(RECORDKEY_FIELD_NAME.key()))
- .setPreCombineField(props.getString(
- PRECOMBINE_FIELD_NAME.key(), PRECOMBINE_FIELD_NAME.defaultValue()))
+ .setPreCombineField(props.getString(PRECOMBINE_FIELD_NAME.key(), null))
.setTableVersion(ConfigUtils.getIntWithAltKeys(props,
WRITE_TABLE_VERSION))
.setPopulateMetaFields(props.getBoolean(
POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue()))
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index f784a7dba6d..074ad8743b5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -153,7 +153,8 @@ public class HoodieStreamer implements Serializable {
public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf,
Option<TypedProperties> propsOverride,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
Triple<RecordMergeMode, String, String> mergingConfigs =
- HoodieTableConfig.inferCorrectMergingBehavior(cfg.recordMergeMode,
cfg.payloadClassName, cfg.recordMergeStrategyId);
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingField);
cfg.recordMergeMode = mergingConfigs.getLeft();
cfg.payloadClassName = mergingConfigs.getMiddle();
cfg.recordMergeStrategyId = mergingConfigs.getRight();
@@ -267,8 +268,8 @@ public class HoodieStreamer implements Serializable {
public String sourceClassName = JsonDFSSource.class.getName();
@Parameter(names = {"--source-ordering-field"}, description = "Field
within source record to decide how"
- + " to break ties between records with same key in input data.
Default: 'ts' holding unix timestamp of record")
- public String sourceOrderingField = "ts";
+ + " to break ties between records with same key in input data.")
+ public String sourceOrderingField = null;
@Parameter(names = {"--payload-class"}, description = "Deprecated. "
+ "Use --merge-mode for commit time or event time merging. "
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index ff726d7db23..2d9d2cdc9c7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -83,7 +83,9 @@ public class HoodieStreamerUtils {
boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys(
props,
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
Set<String> partitionColumns = getPartitionColumns(props);
- String payloadClassName = StringUtils.isNullOrEmpty(cfg.payloadClassName)
? HoodieRecordPayload.getAvroPayloadForMergeMode(cfg.recordMergeMode) :
cfg.payloadClassName;
+ String payloadClassName = StringUtils.isNullOrEmpty(cfg.payloadClassName)
+ ? HoodieRecordPayload.getAvroPayloadForMergeMode(cfg.recordMergeMode,
cfg.payloadClassName)
+ : cfg.payloadClassName;
return avroRDDOptional.map(avroRDD -> {
SerializableSchema avroSchema = new
SerializableSchema(schemaProvider.getTargetSchema());
SerializableSchema processedAvroSchema = new
SerializableSchema(isDropPartitionColumns(props) ?
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 7a9372c5ac7..989be986b40 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -622,7 +622,8 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
}
cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
Triple<RecordMergeMode, String, String> mergeCfgs =
- HoodieTableConfig.inferCorrectMergingBehavior(cfg.recordMergeMode,
cfg.payloadClassName, cfg.recordMergeStrategyId);
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingField);
cfg.recordMergeMode = mergeCfgs.getLeft();
cfg.payloadClassName = mergeCfgs.getMiddle();
cfg.recordMergeStrategyId = mergeCfgs.getRight();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 55c03716d53..cf504ef5fd6 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -192,7 +192,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
opts.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
DefaultSparkRecordMerger.class.getName());
opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.CUSTOM.name());
- opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID);
+ opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
for (Map.Entry<String, String> entry : opts.entrySet()) {
hoodieConfig.add(String.format("%s=%s", entry.getKey(),
entry.getValue()));
}
@@ -561,7 +561,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
addRecordMerger(recordType, cfg.configs);
cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
- cfg.recordMergeStrategyId = HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
+ cfg.recordMergeStrategyId =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" +
basePath + "/source.avsc");
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" +
basePath + "/source.avsc");