This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ec5022b4fdd [MINOR] Unify naming for record merger (#7660)
ec5022b4fdd is described below
commit ec5022b4fdd94c106bf243038aadf781f31b5be9
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Jan 18 12:30:52 2023 -0500
[MINOR] Unify naming for record merger (#7660)
Addressing inconsistent usage of "record merger impls"/"record merger
strategy" and "merger impls"/"merger strategy"
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 31 +++++++++---------
.../functional/TestHoodieBackedMetadata.java | 6 ++--
.../hudi/common/model/HoodieAvroRecordMerger.java | 2 +-
.../hudi/common/table/HoodieTableConfig.java | 20 ++++++------
.../hudi/common/table/HoodieTableMetaClient.java | 38 +++++++++++-----------
.../apache/hudi/common/util/HoodieRecordUtils.java | 4 +--
.../apache/hudi/streamer/FlinkStreamerConfig.java | 15 +++++----
.../apache/hudi/hadoop/TestInputPathHandler.java | 2 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 4 +--
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 14 ++++----
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 4 +--
.../src/main/scala/org/apache/hudi/Iterators.scala | 4 +--
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 4 +--
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 10 +++---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 4 +--
.../hudi/command/MergeIntoHoodieTableCommand.scala | 2 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 2 +-
.../apache/hudi/functional/TestMORDataSource.scala | 2 +-
.../ReadAndWriteWithoutAvroBenchmark.scala | 6 ++--
.../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 4 +--
.../spark/sql/hudi/TestHoodieOptionConfig.scala | 4 +--
.../apache/spark/sql/hudi/TestInsertTable.scala | 2 +-
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 2 +-
.../org/apache/spark/sql/hudi/TestSqlConf.scala | 2 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 6 ++--
.../deltastreamer/TestHoodieDeltaStreamer.java | 2 +-
rfc/rfc-46/rfc-46.md | 2 +-
27 files changed, 100 insertions(+), 98 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 879136206e2..7367826e50d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -135,17 +135,17 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Payload class used. Override this, if you like to
roll your own merge logic, when upserting/inserting. "
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL
in-effective");
- public static final ConfigProperty<String> MERGER_IMPLS = ConfigProperty
- .key("hoodie.datasource.write.merger.impls")
+ public static final ConfigProperty<String> RECORD_MERGER_IMPLS =
ConfigProperty
+ .key("hoodie.datasource.write.record.merger.impls")
.defaultValue(HoodieAvroRecordMerger.class.getName())
.withDocumentation("List of HoodieMerger implementations constituting
Hudi's merging strategy -- based on the engine used. "
- + "These merger impls will filter by
hoodie.datasource.write.merger.strategy "
+ + "These merger impls will filter by
hoodie.datasource.write.record.merger.strategy "
+ "Hudi will pick most efficient implementation to perform
merging/combining of the records (during update, reading MOR table, etc)");
- public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
- .key("hoodie.datasource.write.merger.strategy")
+ public static final ConfigProperty<String> RECORD_MERGER_STRATEGY =
ConfigProperty
+ .key("hoodie.datasource.write.record.merger.strategy")
.defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
- .withDocumentation("Id of merger strategy. Hudi will pick
HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls
which has the same merger strategy id");
+ .withDocumentation("Id of merger strategy. Hudi will pick
HoodieRecordMerger implementations in
hoodie.datasource.write.record.merger.impls which has the same merger strategy
id");
public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME =
ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
@@ -971,12 +971,12 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public HoodieRecordMerger getRecordMerger() {
- List<String> mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
+ List<String> mergers =
getSplitStringsOrDefault(RECORD_MERGER_IMPLS).stream()
.map(String::trim)
.distinct()
.collect(Collectors.toList());
- String mergerStrategy = getString(MERGER_STRATEGY);
- return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH),
engineType, mergers, mergerStrategy);
+ String recordMergerStrategy = getString(RECORD_MERGER_STRATEGY);
+ return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH),
engineType, mergers, recordMergerStrategy);
}
public String getSchema() {
@@ -987,8 +987,8 @@ public class HoodieWriteConfig extends HoodieConfig {
setValue(AVRO_SCHEMA_STRING, schemaStr);
}
- public void setMergerClass(String mergerStrategy) {
- setValue(MERGER_STRATEGY, mergerStrategy);
+ public void setRecordMergerClass(String recordMergerStrategy) {
+ setValue(RECORD_MERGER_STRATEGY, recordMergerStrategy);
}
/**
@@ -1934,6 +1934,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public String getDatadogApiKey() {
if (props.containsKey(HoodieMetricsDatadogConfig.API_KEY.key())) {
return getString(HoodieMetricsDatadogConfig.API_KEY);
+
} else {
Supplier<String> apiKeySupplier = ReflectionUtils.loadClass(
getString(HoodieMetricsDatadogConfig.API_KEY_SUPPLIER));
@@ -2391,13 +2392,13 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
- public Builder withMergerImpls(String mergerImpls) {
- writeConfig.setValue(MERGER_IMPLS, mergerImpls);
+ public Builder withRecordMergerImpls(String recordMergerImpls) {
+ writeConfig.setValue(RECORD_MERGER_IMPLS, recordMergerImpls);
return this;
}
- public Builder withMergerStrategy(String mergerStrategy) {
- writeConfig.setValue(MERGER_STRATEGY, mergerStrategy);
+ public Builder withRecordMergerStrategy(String recordMergerStrategy) {
+ writeConfig.setValue(RECORD_MERGER_STRATEGY, recordMergerStrategy);
return this;
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 26c8ef32734..bb97b6e858e 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -358,7 +358,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertTrue(metadataWriter.isPresent());
HoodieTableConfig hoodieTableConfig =
- new HoodieTableConfig(this.fs, metaClient.getMetaPath(),
writeConfig.getPayloadClass(),
writeConfig.getStringOrDefault(HoodieWriteConfig.MERGER_IMPLS));
+ new HoodieTableConfig(this.fs, metaClient.getMetaPath(),
writeConfig.getPayloadClass(),
writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS));
assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
// Turn off metadata table
@@ -375,7 +375,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertFalse(metadataWriter2.isPresent());
HoodieTableConfig hoodieTableConfig2 =
- new HoodieTableConfig(this.fs, metaClient.getMetaPath(),
writeConfig2.getPayloadClass(),
writeConfig.getStringOrDefault(HoodieWriteConfig.MERGER_IMPLS));
+ new HoodieTableConfig(this.fs, metaClient.getMetaPath(),
writeConfig2.getPayloadClass(),
writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS));
assertEquals(Collections.emptySet(),
hoodieTableConfig2.getMetadataPartitions());
// Assert metadata table folder is deleted
assertFalse(metaClient.getFs().exists(
@@ -397,7 +397,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(testTable, true);
assertTrue(metadataWriter3.isPresent());
HoodieTableConfig hoodieTableConfig3 =
- new HoodieTableConfig(this.fs, metaClient.getMetaPath(),
writeConfig.getPayloadClass(),
writeConfig.getStringOrDefault(HoodieWriteConfig.MERGER_IMPLS));
+ new HoodieTableConfig(this.fs, metaClient.getMetaPath(),
writeConfig.getPayloadClass(),
writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS));
assertFalse(hoodieTableConfig3.getMetadataPartitions().isEmpty());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index b9e29787f8a..e49d560b74c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -98,7 +98,7 @@ public class HoodieAvroRecordMerger implements
HoodieRecordMerger {
}
public static ConfigProperty<String> LEGACY_OPERATING_MODE =
- ConfigProperty.key("hoodie.datasource.write.merger.legacy.operation")
+
ConfigProperty.key("hoodie.datasource.write.record.merger.legacy.operation")
.defaultValue(LegacyOperationMode.COMBINING.name())
.withDocumentation("Controls the mode of the merging operation
performed by `HoodieAvroRecordMerger`. "
+ "This is required to maintain backward-compatibility w/ the
existing semantic of `HoodieRecordPayload` "
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 549d5559c2f..90b10c60ed0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -173,10 +173,10 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Payload class to use for performing compactions, i.e
merge delta logs with current base file and then "
+ " produce a new base file.");
- public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
- .key("hoodie.compaction.merger.strategy")
+ public static final ConfigProperty<String> RECORD_MERGER_STRATEGY =
ConfigProperty
+ .key("hoodie.compaction.record.merger.strategy")
.defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
- .withDocumentation("Id of merger strategy. Hudi will pick
HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls
which has the same merger strategy id");
+ .withDocumentation("Id of merger strategy. Hudi will pick
HoodieRecordMerger implementations in
hoodie.datasource.write.record.merger.impls which has the same merger strategy
id");
public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty
.key("hoodie.archivelog.folder")
@@ -265,7 +265,7 @@ public class HoodieTableConfig extends HoodieConfig {
private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; //
<database_name>.<table_name>
- public HoodieTableConfig(FileSystem fs, String metaPath, String
payloadClassName, String mergerStrategyId) {
+ public HoodieTableConfig(FileSystem fs, String metaPath, String
payloadClassName, String recordMergerStrategyId) {
super();
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
LOG.info("Loading table properties from " + propertyPath);
@@ -277,9 +277,9 @@ public class HoodieTableConfig extends HoodieConfig {
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
needStore = true;
}
- if (contains(MERGER_STRATEGY) && payloadClassName != null
- && !getString(MERGER_STRATEGY).equals(mergerStrategyId)) {
- setValue(MERGER_STRATEGY, mergerStrategyId);
+ if (contains(RECORD_MERGER_STRATEGY) && payloadClassName != null
+ &&
!getString(RECORD_MERGER_STRATEGY).equals(recordMergerStrategyId)) {
+ setValue(RECORD_MERGER_STRATEGY, recordMergerStrategyId);
needStore = true;
}
if (needStore) {
@@ -449,7 +449,7 @@ public class HoodieTableConfig extends HoodieConfig {
hoodieConfig.setDefaultValue(TYPE);
if
(hoodieConfig.getString(TYPE).equals(HoodieTableType.MERGE_ON_READ.name())) {
hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME);
- hoodieConfig.setDefaultValue(MERGER_STRATEGY);
+ hoodieConfig.setDefaultValue(RECORD_MERGER_STRATEGY);
}
hoodieConfig.setDefaultValue(ARCHIVELOG_FOLDER);
if (!hoodieConfig.contains(TIMELINE_LAYOUT_VERSION)) {
@@ -522,8 +522,8 @@ public class HoodieTableConfig extends HoodieConfig {
/**
* Read the payload class for HoodieRecords from the table properties.
*/
- public String getMergerStrategy() {
- return getStringOrDefault(MERGER_STRATEGY);
+ public String getRecordMergerStrategy() {
+ return getStringOrDefault(RECORD_MERGER_STRATEGY);
}
public String getPreCombineField() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 1ff0a589af9..672d057f1e1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -126,7 +126,7 @@ public class HoodieTableMetaClient implements Serializable {
*/
protected HoodieTableMetaClient(Configuration conf, String basePath, boolean
loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig,
Option<TimelineLayoutVersion> layoutVersion,
- String payloadClassName, String
mergerStrategy, FileSystemRetryConfig fileSystemRetryConfig) {
+ String payloadClassName, String
recordMergerStrategy, FileSystemRetryConfig fileSystemRetryConfig) {
LOG.info("Loading HoodieTableMetaClient from " + basePath);
this.consistencyGuardConfig = consistencyGuardConfig;
this.fileSystemRetryConfig = fileSystemRetryConfig;
@@ -135,7 +135,7 @@ public class HoodieTableMetaClient implements Serializable {
this.metaPath = new SerializablePath(new CachingPath(basePath,
METAFOLDER_NAME));
this.fs = getFs();
TableNotFoundException.checkTableValidity(fs, this.basePath.get(),
metaPath.get());
- this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(),
payloadClassName, mergerStrategy);
+ this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(),
payloadClassName, recordMergerStrategy);
this.tableType = tableConfig.getTableType();
Option<TimelineLayoutVersion> tableConfigVersion =
tableConfig.getTimelineLayoutVersion();
if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
@@ -170,7 +170,7 @@ public class HoodieTableMetaClient implements Serializable {
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig)
.setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion))
.setPayloadClassName(null)
- .setMergerStrategy(null)
+ .setRecordMergerStrategy(null)
.setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build();
}
@@ -680,17 +680,17 @@ public class HoodieTableMetaClient implements
Serializable {
private static HoodieTableMetaClient newMetaClient(Configuration conf,
String basePath, boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig,
Option<TimelineLayoutVersion> layoutVersion,
- String payloadClassName, String mergerStrategy, FileSystemRetryConfig
fileSystemRetryConfig, Properties props) {
+ String payloadClassName, String recordMergerStrategy,
FileSystemRetryConfig fileSystemRetryConfig, Properties props) {
HoodieMetaserverConfig metaserverConfig = null == props
? new HoodieMetaserverConfig.Builder().build()
: new HoodieMetaserverConfig.Builder().fromProperties(props).build();
return metaserverConfig.isMetaserverEnabled()
? (HoodieTableMetaClient)
ReflectionUtils.loadClass("org.apache.hudi.common.table.HoodieTableMetaserverClient",
new Class<?>[] {Configuration.class, ConsistencyGuardConfig.class,
String.class, FileSystemRetryConfig.class, String.class, String.class,
HoodieMetaserverConfig.class},
- conf, consistencyGuardConfig, mergerStrategy, fileSystemRetryConfig,
+ conf, consistencyGuardConfig, recordMergerStrategy,
fileSystemRetryConfig,
props.getProperty(HoodieTableConfig.DATABASE_NAME.key()),
props.getProperty(HoodieTableConfig.NAME.key()), metaserverConfig)
: new HoodieTableMetaClient(conf, basePath,
- loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion,
payloadClassName, mergerStrategy, fileSystemRetryConfig);
+ loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion,
payloadClassName, recordMergerStrategy, fileSystemRetryConfig);
}
public static Builder builder() {
@@ -706,7 +706,7 @@ public class HoodieTableMetaClient implements Serializable {
private String basePath;
private boolean loadActiveTimelineOnLoad = false;
private String payloadClassName = null;
- private String mergerStrategy = null;
+ private String recordMergerStrategy = null;
private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig =
FileSystemRetryConfig.newBuilder().build();
private Option<TimelineLayoutVersion> layoutVersion =
Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
@@ -732,8 +732,8 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
- public Builder setMergerStrategy(String mergerStrategy) {
- this.mergerStrategy = mergerStrategy;
+ public Builder setRecordMergerStrategy(String recordMergerStrategy) {
+ this.recordMergerStrategy = recordMergerStrategy;
return this;
}
@@ -762,7 +762,7 @@ public class HoodieTableMetaClient implements Serializable {
ValidationUtils.checkArgument(basePath != null, "basePath needs to be
set to init HoodieTableMetaClient");
return newMetaClient(conf, basePath,
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion,
payloadClassName,
- mergerStrategy, fileSystemRetryConfig, props);
+ recordMergerStrategy, fileSystemRetryConfig, props);
}
}
@@ -782,7 +782,7 @@ public class HoodieTableMetaClient implements Serializable {
private String recordKeyFields;
private String archiveLogFolder;
private String payloadClassName;
- private String mergerStrategy;
+ private String recordMergerStrategy;
private Integer timelineLayoutVersion;
private String baseFileFormat;
private String preCombineField;
@@ -852,8 +852,8 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
- public PropertyBuilder setMergerStrategy(String mergerStrategy) {
- this.mergerStrategy = mergerStrategy;
+ public PropertyBuilder setRecordMergerStrategy(String
recordMergerStrategy) {
+ this.recordMergerStrategy = recordMergerStrategy;
return this;
}
@@ -977,7 +977,7 @@ public class HoodieTableMetaClient implements Serializable {
.setTableName(metaClient.getTableConfig().getTableName())
.setArchiveLogFolder(metaClient.getArchivePath())
.setPayloadClassName(metaClient.getTableConfig().getPayloadClass())
- .setMergerStrategy(metaClient.getTableConfig().getMergerStrategy());
+
.setRecordMergerStrategy(metaClient.getTableConfig().getRecordMergerStrategy());
}
public PropertyBuilder fromProperties(Properties properties) {
@@ -1007,9 +1007,9 @@ public class HoodieTableMetaClient implements
Serializable {
setPayloadClassName(
hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
}
- if (hoodieConfig.contains(HoodieTableConfig.MERGER_STRATEGY)) {
- setMergerStrategy(
- hoodieConfig.getString(HoodieTableConfig.MERGER_STRATEGY));
+ if (hoodieConfig.contains(HoodieTableConfig.RECORD_MERGER_STRATEGY)) {
+ setRecordMergerStrategy(
+ hoodieConfig.getString(HoodieTableConfig.RECORD_MERGER_STRATEGY));
}
if (hoodieConfig.contains(HoodieTableConfig.TIMELINE_LAYOUT_VERSION)) {
setTimelineLayoutVersion(hoodieConfig.getInt(HoodieTableConfig.TIMELINE_LAYOUT_VERSION));
@@ -1097,8 +1097,8 @@ public class HoodieTableMetaClient implements
Serializable {
if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName !=
null) {
tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME,
payloadClassName);
}
- if (tableType == HoodieTableType.MERGE_ON_READ && mergerStrategy !=
null) {
- tableConfig.setValue(HoodieTableConfig.MERGER_STRATEGY,
mergerStrategy);
+ if (tableType == HoodieTableType.MERGE_ON_READ && recordMergerStrategy
!= null) {
+ tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY,
recordMergerStrategy);
}
if (null != tableCreateSchema) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 666a084877b..31f9e226c00 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -67,7 +67,7 @@ public class HoodieRecordUtils {
* Instantiate a given class with a record merge.
*/
public static HoodieRecordMerger createRecordMerger(String basePath,
EngineType engineType,
- List<String> mergerClassList, String mergerStrategy) {
+ List<String> mergerClassList, String recordMergerStrategy) {
if (mergerClassList.isEmpty() ||
HoodieTableMetadata.isMetadataTable(basePath)) {
return
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
} else {
@@ -81,7 +81,7 @@ public class HoodieRecordUtils {
}
})
.filter(Objects::nonNull)
- .filter(merger -> merger.getMergingStrategy().equals(mergerStrategy))
+ .filter(merger ->
merger.getMergingStrategy().equals(recordMergerStrategy))
.filter(merger -> recordTypeCompatibleEngine(merger.getRecordType(),
engineType))
.findFirst()
.orElse(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 9454fd9e77e..e110cb6ba90 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -121,13 +121,14 @@ public class FlinkStreamerConfig extends Configuration {
+ "a GenericRecord. Implement your own, if you want to do something
other than overwriting existing value.")
public String payloadClassName =
OverwriteWithLatestAvroPayload.class.getName();
- @Parameter(names = {"--merger-impls"}, description = "List of HoodieMerger
implementations constituting Hudi's merging strategy -- based on the engine
used. "
- + "These merger impls will filter by merger-strategy "
+ @Parameter(names = {"--record-merger-impls"}, description = "List of
HoodieMerger implementations constituting Hudi's record merging strategy --
based on the engine used. "
+ + "These record merger impls will filter by record-merger-strategy "
+ "Hudi will pick most efficient implementation to perform
merging/combining of the records (during update, reading MOR table, etc)")
- public String mergerImpls = HoodieAvroRecordMerger.class.getName();
+ public String recordMergerImpls = HoodieAvroRecordMerger.class.getName();
- @Parameter(names = {"--merger-strategy"}, description = "Id of merger
strategy. Hudi will pick HoodieRecordMerger implementations in merger-impls
which has the same merger strategy id")
- public String mergerStrategy =
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+ @Parameter(names = {"--record-merger-strategy"}, description = "Id of record
merger strategy. Hudi will pick HoodieRecordMerger implementations in
record-merger-impls "
+ + "which has the same record merger strategy id")
+ public String recordMergerStrategy =
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
@Parameter(names = {"--op"}, description = "Takes one of these values :
UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed).", converter =
OperationConverter.class)
@@ -407,8 +408,8 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName);
- conf.setString(FlinkOptions.RECORD_MERGER_IMPLS, config.mergerImpls);
- conf.setString(FlinkOptions.RECORD_MERGER_STRATEGY, config.mergerStrategy);
+ conf.setString(FlinkOptions.RECORD_MERGER_IMPLS, config.recordMergerImpls);
+ conf.setString(FlinkOptions.RECORD_MERGER_STRATEGY,
config.recordMergerStrategy);
conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine);
conf.setInteger(FlinkOptions.RETRY_TIMES,
Integer.parseInt(config.instantRetryTimes));
conf.setLong(FlinkOptions.RETRY_INTERVAL_MS,
Long.parseLong(config.instantRetryInterval));
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
index a9cf806b19d..561851c8e2b 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
@@ -160,7 +160,7 @@ public class TestInputPathHandler {
properties.setProperty(HoodieTableConfig.NAME.key(), tableName);
properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
HoodieAvroPayload.class.getName());
- properties.setProperty(HoodieTableConfig.MERGER_STRATEGY.key(),
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
+ properties.setProperty(HoodieTableConfig.RECORD_MERGER_STRATEGY.key(),
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf,
basePath, properties);
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 32c58665572..5de9cc8a411 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -344,12 +344,12 @@ object DataSourceWriteOptions {
* HoodieMerger will replace the payload to process the merge of data
* and provide the same capabilities as the payload
*/
- val MERGER_IMPLS = HoodieWriteConfig.MERGER_IMPLS
+ val RECORD_MERGER_IMPLS = HoodieWriteConfig.RECORD_MERGER_IMPLS
/**
* Id of merger strategy
*/
- val MERGER_STRATEGY = HoodieWriteConfig.MERGER_STRATEGY
+ val RECORD_MERGER_STRATEGY = HoodieWriteConfig.RECORD_MERGER_STRATEGY
/**
* Record key field. Value to be used as the `recordKey` component of
`HoodieKey`. Actual value
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 9f30db6158f..80914337eec 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -78,8 +78,8 @@ case class HoodieTableState(tablePath: String,
usesVirtualKeys: Boolean,
recordPayloadClassName: String,
metadataConfig: HoodieMetadataConfig,
- mergerImpls: List[String],
- mergerStrategy: String)
+ recordMergerImpls: List[String],
+ recordMergerStrategy: String)
/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -460,10 +460,10 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
}
protected def getTableState: HoodieTableState = {
- val mergerImpls =
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.MERGER_IMPLS)).asScala.toList
+ val recordMergerImpls =
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
- val mergerStrategy = getConfigValue(HoodieWriteConfig.MERGER_STRATEGY,
- Option(metaClient.getTableConfig.getMergerStrategy))
+ val recordMergerStrategy =
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
+ Option(metaClient.getTableConfig.getRecordMergerStrategy))
// Subset of the state of table's configuration as of at the time of the
query
HoodieTableState(
@@ -474,8 +474,8 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
usesVirtualKeys = !tableConfig.populateMetaFields(),
recordPayloadClassName = tableConfig.getPayloadClass,
metadataConfig = fileIndex.metadataConfig,
- mergerImpls = mergerImpls,
- mergerStrategy = mergerStrategy
+ recordMergerImpls = recordMergerImpls,
+ recordMergerStrategy = recordMergerStrategy
)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index d0e270ed136..90535348ae6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -18,7 +18,7 @@
package org.apache.hudi
import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
-import org.apache.hudi.DataSourceWriteOptions.{MERGER_IMPLS, _}
+import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _}
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, HoodieConfig}
import org.apache.hudi.common.table.HoodieTableConfig
@@ -224,7 +224,7 @@ object HoodieWriterUtils {
PARTITIONPATH_FIELD -> HoodieTableConfig.PARTITION_FIELDS,
RECORDKEY_FIELD -> HoodieTableConfig.RECORDKEY_FIELDS,
PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME,
- MERGER_STRATEGY -> HoodieTableConfig.MERGER_STRATEGY
+ RECORD_MERGER_STRATEGY -> HoodieTableConfig.RECORD_MERGER_STRATEGY
)
def mappingSparkDatasourceConfigsToTableConfigs(options: Map[String,
String]): Map[String, String] = {
val includingTableConfigs = scala.collection.mutable.Map() ++ options
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 7e18f7a9746..d81745156a6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -195,7 +195,7 @@ class RecordMergingFileIterator(split:
HoodieMergeOnReadFileSplit,
private val baseFileIterator = baseFileReader(split.dataFile.get)
- private val recordMerger =
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK,
tableState.mergerImpls.asJava, tableState.mergerStrategy)
+ private val recordMerger =
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK,
tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy)
override def doHasNext: Boolean = hasNextInternal
@@ -316,7 +316,7 @@ object LogFileIterator {
getRelativePartitionPath(new Path(tableState.tablePath),
logFiles.head.getPath.getParent))
}
- val recordMerger =
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK,
tableState.mergerImpls.asJava, tableState.mergerStrategy)
+ val recordMerger =
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK,
tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy)
logRecordScannerBuilder.withRecordMerger(recordMerger)
logRecordScannerBuilder.build()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 9df5586b3d9..17a1cab27fe 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -161,8 +161,8 @@ class HoodieCDCRDD(
metaClient.getTableConfig.getPayloadClass,
metadataConfig,
// TODO support CDC with spark record
- mergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
- mergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
+ recordMergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
+ recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 7219af6c4af..7da69b2c0bb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -69,10 +69,10 @@ object HoodieOptionConfig {
.defaultValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue())
.build()
- val SQL_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf()
- .withSqlKey("mergerStrategy")
- .withHoodieKey(DataSourceWriteOptions.MERGER_STRATEGY.key)
- .withTableConfigKey(HoodieTableConfig.MERGER_STRATEGY.key)
+ val SQL_RECORD_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf()
+ .withSqlKey("recordMergerStrategy")
+ .withHoodieKey(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key)
+ .withTableConfigKey(HoodieTableConfig.RECORD_MERGER_STRATEGY.key)
.defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
.build()
@@ -197,7 +197,7 @@ object HoodieOptionConfig {
// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
val sqlOptions = mapTableConfigsToSqlOptions(options)
- val targetOptions = sqlOptionKeyToWriteConfigKey.keySet --
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
+ val targetOptions = sqlOptionKeyToWriteConfigKey.keySet --
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_RECORD_MERGER_STRATEGY.sqlKeyName)
sqlOptions.filterKeys(targetOptions.contains)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index db2b93eda08..bf6b6509b95 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -66,7 +66,7 @@ trait ProvidesHoodieConfig extends Logging {
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
TBL_NAME.key -> hoodieCatalogTable.tableName,
PRECOMBINE_FIELD.key -> preCombineField,
- MERGER_IMPLS.key ->
hoodieProps.getString(HoodieWriteConfig.MERGER_IMPLS.key,
HoodieWriteConfig.MERGER_IMPLS.defaultValue),
+ RECORD_MERGER_IMPLS.key ->
hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key,
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue),
HIVE_STYLE_PARTITIONING.key ->
tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key ->
classOf[SqlKeyGenerator].getCanonicalName,
@@ -193,7 +193,7 @@ trait ProvidesHoodieConfig extends Logging {
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> partitionFieldsStr,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
- MERGER_IMPLS.key ->
hoodieProps.getString(HoodieWriteConfig.MERGER_IMPLS.key,
HoodieWriteConfig.MERGER_IMPLS.defaultValue),
+ RECORD_MERGER_IMPLS.key ->
hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key,
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue),
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key ->
String.valueOf(hasPrecombineColumn),
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index d4cba17f4d4..7befb97c7b8 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -189,7 +189,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// Create the write parameters
val parameters = buildMergeIntoConfig(hoodieCatalogTable)
// TODO Remove it when we implement ExpressionPayload for SparkRecord
- val parametersWithAvroRecordMerger = parameters ++
Map(HoodieWriteConfig.MERGER_IMPLS.key ->
classOf[HoodieAvroRecordMerger].getName)
+ val parametersWithAvroRecordMerger = parameters ++
Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieAvroRecordMerger].getName)
executeUpsert(sourceDF, parametersWithAvroRecordMerger)
sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 21b6a34c94f..ea34123be43 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -73,7 +73,7 @@ class TestCOWDataSource extends HoodieClientTestBase with
ScalaAssertionSupport
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
)
- val sparkOpts = Map(HoodieWriteConfig.MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName)
+ val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName)
val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index e6e850d6212..184cb97f2be 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -68,7 +68,7 @@ class TestMORDataSource extends HoodieClientTestBase with
SparkDatasetMixin {
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
val sparkOpts = Map(
- HoodieWriteConfig.MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName,
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
index 3547e42148d..582a6b4cf20 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
@@ -82,7 +82,7 @@ object ReadAndWriteWithoutAvroBenchmark extends
HoodieBenchmarkBase {
if (spark.catalog.tableExists(tableName)) {
spark.sql(s"drop table if exists $tableName")
}
- spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = $mergerImpl")
+ spark.sql(s"set ${HoodieWriteConfig.RECORD_MERGER_IMPLS.key} =
$mergerImpl")
spark.sql(
s"""
|create table $tableName(
@@ -157,7 +157,7 @@ object ReadAndWriteWithoutAvroBenchmark extends
HoodieBenchmarkBase {
prepareHoodieTable(sparkTable, new Path(sparkPath.getCanonicalPath,
sparkTable).toUri.toString, "mor", sparkMergerImpl, df)
Seq(avroMergerImpl, sparkMergerImpl).zip(Seq(avroTable,
sparkTable)).foreach {
case (mergerImpl, tableName) => upsertBenchmark.addCase(mergerImpl)
{ _ =>
- spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} =
$mergerImpl")
+ spark.sql(s"set ${HoodieWriteConfig.RECORD_MERGER_IMPLS.key} =
$mergerImpl")
spark.sql(s"update $tableName set s1 = 's1_new_1' where id > 0")
}
}
@@ -166,7 +166,7 @@ object ReadAndWriteWithoutAvroBenchmark extends
HoodieBenchmarkBase {
val readBenchmark = new HoodieBenchmark("pref read", 10000, 3)
Seq(avroMergerImpl, sparkMergerImpl).zip(Seq(avroTable,
sparkTable)).foreach {
case (mergerImpl, tableName) => readBenchmark.addCase(mergerImpl) {
_ =>
- spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} =
$mergerImpl")
+ spark.sql(s"set ${HoodieWriteConfig.RECORD_MERGER_IMPLS.key} =
$mergerImpl")
spark.sql(s"select * from $tableName").collect()
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index a185b3849bd..cf37f7436fd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -200,7 +200,7 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
case _ => (classOf[HoodieAvroRecordMerger].getName, "avro")
}
val config = Map(
- HoodieWriteConfig.MERGER_IMPLS.key -> merger,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> merger,
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> format) ++
recordConfig.getOrElse(recordType, Map.empty)
withSQLConf(config.toList:_*) {
f
@@ -211,7 +211,7 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
}
protected def getRecordType(): HoodieRecordType = {
- val merger =
spark.sessionState.conf.getConfString(HoodieWriteConfig.MERGER_IMPLS.key,
HoodieWriteConfig.MERGER_IMPLS.defaultValue())
+ val merger =
spark.sessionState.conf.getConfString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key,
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())
if (merger.equals(classOf[HoodieSparkRecordMerger].getName)) {
HoodieRecordType.SPARK
} else {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 2cb9c98878b..4e726317844 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -36,13 +36,13 @@ class TestHoodieOptionConfig extends
SparkClientFunctionalTestHarness {
assertTrue(with1("primaryKey") == "id")
assertTrue(with1("type") == "cow")
assertTrue(with1("payloadClass") ==
classOf[OverwriteWithLatestAvroPayload].getName)
- assertTrue(with1("mergerStrategy") ==
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+ assertTrue(with1("recordMergerStrategy") ==
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
val ops2 = Map("primaryKey" -> "id",
"preCombineField" -> "timestamp",
"type" -> "mor",
"payloadClass" -> classOf[DefaultHoodieRecordPayload].getName,
- "mergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
+ "recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
)
val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
assertTrue(ops2 == with2)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index b6444b52b52..868a96ebc2c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -231,7 +231,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
withRecordType(Map(HoodieRecordType.SPARK ->
// SparkMerger should use "HoodieSparkValidateDuplicateKeyRecordMerger"
// with "hoodie.sql.insert.mode=strict"
- Map(HoodieWriteConfig.MERGER_IMPLS.key ->
+ Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkValidateDuplicateKeyRecordMerger].getName)))(withTempDir {
tmp =>
val tableName = generateTableName
spark.sql(s"set hoodie.sql.insert.mode=strict")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 14d6d9afaee..cd46e9f3789 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -630,7 +630,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
val sparkOpts = Map(
- HoodieWriteConfig.MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName,
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
index 72bd71e4bba..dbf6d173865 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
@@ -86,7 +86,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with
BeforeAndAfter {
new Path(tablePath).getFileSystem(new Configuration),
s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME,
HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue,
- HoodieTableConfig.MERGER_STRATEGY.defaultValue).getTableType)
+ HoodieTableConfig.RECORD_MERGER_STRATEGY.defaultValue).getTableType)
// Manually pass incremental configs to global configs to make sure Hudi
query is able to load the
// global configs
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 10dbf998a5f..63cf706c174 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -281,7 +281,7 @@ public class DeltaSync implements Serializable, Closeable {
.setConf(new Configuration(fs.getConf()))
.setBasePath(cfg.targetBasePath)
.setPayloadClassName(cfg.payloadClassName)
-
.setMergerStrategy(props.getProperty(HoodieWriteConfig.MERGER_STRATEGY.key(),
HoodieWriteConfig.MERGER_STRATEGY.defaultValue()))
+
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
.build();
switch (meta.getTableType()) {
case COPY_ON_WRITE:
@@ -474,8 +474,8 @@ public class DeltaSync implements Serializable, Closeable {
private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchFromSource(Option<String> resumeCheckpointStr) {
HoodieRecordType recordType = HoodieRecordUtils.createRecordMerger(null,
EngineType.SPARK,
-
ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.MERGER_IMPLS.key(),
HoodieWriteConfig.MERGER_IMPLS.defaultValue())),
- props.getProperty(HoodieWriteConfig.MERGER_STRATEGY.key(),
HoodieWriteConfig.MERGER_STRATEGY.defaultValue())).getRecordType();
+
ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(),
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())),
+ props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).getRecordType();
if (recordType == HoodieRecordType.SPARK &&
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
&&
HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"avro"))
!= HoodieLogBlockType.PARQUET_DATA_BLOCK) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 60b870b6549..ffa2a812f6f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -272,7 +272,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
static void addRecordMerger(HoodieRecordType type, List<String>
hoodieConfig) {
if (type == HoodieRecordType.SPARK) {
- hoodieConfig.add(String.format("%s=%s",
HoodieWriteConfig.MERGER_IMPLS.key(), HoodieSparkRecordMerger.class.getName()));
+ hoodieConfig.add(String.format("%s=%s",
HoodieWriteConfig.RECORD_MERGER_IMPLS.key(),
HoodieSparkRecordMerger.class.getName()));
hoodieConfig.add(String.format("%s=%s",
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),"parquet"));
}
}
diff --git a/rfc/rfc-46/rfc-46.md b/rfc/rfc-46/rfc-46.md
index c16ff178f77..d7d08f4a7ae 100644
--- a/rfc/rfc-46/rfc-46.md
+++ b/rfc/rfc-46/rfc-46.md
@@ -148,7 +148,7 @@ class HoodieFlinkRecordMerger implements HoodieRecordMerger
{
Where user can provide their own subclass implementing such interface for the
engines of interest.
### Merging Strategy
-The RecordMerger is engine-aware. We provide a config called
HoodieWriteConfig.MERGER_IMPLS. You can set a list of RecordMerger class name
to it. And you can set HoodieWriteConfig.MERGER_STRATEGY which is UUID of
RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same
MERGER_STRATEGY according to the engine type at runtime.
+The RecordMerger is engine-aware. We provide a config called
HoodieWriteConfig.RECORD_MERGER_IMPLS. You can set a list of RecordMerger class
name to it. And you can set HoodieWriteConfig.RECORD_MERGER_STRATEGY which is
UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has
the same MERGER_STRATEGY according to the engine type at runtime.
- Every RecordMerger implementation being engine-specific (referred to as
"implementation"), implements particular merging semantic (referred to as
"merging strategy")
- Such tiering allowing us to be flexible in terms of providing
implementations for the merging strategy only for engines you might be
interested in
- Merging strategy is a table property that is set once during init