This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ec5022b4fdd [MINOR] Unify naming for record merger (#7660)
ec5022b4fdd is described below

commit ec5022b4fdd94c106bf243038aadf781f31b5be9
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Jan 18 12:30:52 2023 -0500

    [MINOR] Unify naming for record merger (#7660)
    
    Addressing inconsistent usage of "record merger impls"/"record merger 
strategy" and "merger impls"/"merger strategy"
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 31 +++++++++---------
 .../functional/TestHoodieBackedMetadata.java       |  6 ++--
 .../hudi/common/model/HoodieAvroRecordMerger.java  |  2 +-
 .../hudi/common/table/HoodieTableConfig.java       | 20 ++++++------
 .../hudi/common/table/HoodieTableMetaClient.java   | 38 +++++++++++-----------
 .../apache/hudi/common/util/HoodieRecordUtils.java |  4 +--
 .../apache/hudi/streamer/FlinkStreamerConfig.java  | 15 +++++----
 .../apache/hudi/hadoop/TestInputPathHandler.java   |  2 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  4 +--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 14 ++++----
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  4 +--
 .../src/main/scala/org/apache/hudi/Iterators.scala |  4 +--
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   |  4 +--
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala | 10 +++---
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  4 +--
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |  2 +-
 .../apache/hudi/functional/TestMORDataSource.scala |  2 +-
 .../ReadAndWriteWithoutAvroBenchmark.scala         |  6 ++--
 .../spark/sql/hudi/HoodieSparkSqlTestBase.scala    |  4 +--
 .../spark/sql/hudi/TestHoodieOptionConfig.scala    |  4 +--
 .../apache/spark/sql/hudi/TestInsertTable.scala    |  2 +-
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  |  2 +-
 .../org/apache/spark/sql/hudi/TestSqlConf.scala    |  2 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  6 ++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  2 +-
 rfc/rfc-46/rfc-46.md                               |  2 +-
 27 files changed, 100 insertions(+), 98 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 879136206e2..7367826e50d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -135,17 +135,17 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
 
-  public static final ConfigProperty<String> MERGER_IMPLS = ConfigProperty
-      .key("hoodie.datasource.write.merger.impls")
+  public static final ConfigProperty<String> RECORD_MERGER_IMPLS = 
ConfigProperty
+      .key("hoodie.datasource.write.record.merger.impls")
       .defaultValue(HoodieAvroRecordMerger.class.getName())
       .withDocumentation("List of HoodieMerger implementations constituting 
Hudi's merging strategy -- based on the engine used. "
-          + "These merger impls will filter by 
hoodie.datasource.write.merger.strategy "
+          + "These merger impls will filter by 
hoodie.datasource.write.record.merger.strategy "
           + "Hudi will pick most efficient implementation to perform 
merging/combining of the records (during update, reading MOR table, etc)");
 
-  public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
-      .key("hoodie.datasource.write.merger.strategy")
+  public static final ConfigProperty<String> RECORD_MERGER_STRATEGY = 
ConfigProperty
+      .key("hoodie.datasource.write.record.merger.strategy")
       .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
-      .withDocumentation("Id of merger strategy. Hudi will pick 
HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls 
which has the same merger strategy id");
+      .withDocumentation("Id of merger strategy. Hudi will pick 
HoodieRecordMerger implementations in 
hoodie.datasource.write.record.merger.impls which has the same merger strategy 
id");
 
   public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = 
ConfigProperty
       .key("hoodie.datasource.write.keygenerator.class")
@@ -971,12 +971,12 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public HoodieRecordMerger getRecordMerger() {
-    List<String> mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
+    List<String> mergers = 
getSplitStringsOrDefault(RECORD_MERGER_IMPLS).stream()
         .map(String::trim)
         .distinct()
         .collect(Collectors.toList());
-    String mergerStrategy = getString(MERGER_STRATEGY);
-    return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), 
engineType, mergers, mergerStrategy);
+    String recordMergerStrategy = getString(RECORD_MERGER_STRATEGY);
+    return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), 
engineType, mergers, recordMergerStrategy);
   }
 
   public String getSchema() {
@@ -987,8 +987,8 @@ public class HoodieWriteConfig extends HoodieConfig {
     setValue(AVRO_SCHEMA_STRING, schemaStr);
   }
 
-  public void setMergerClass(String mergerStrategy) {
-    setValue(MERGER_STRATEGY, mergerStrategy);
+  public void setRecordMergerClass(String recordMergerStrategy) {
+    setValue(RECORD_MERGER_STRATEGY, recordMergerStrategy);
   }
 
   /**
@@ -1934,6 +1934,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   public String getDatadogApiKey() {
     if (props.containsKey(HoodieMetricsDatadogConfig.API_KEY.key())) {
       return getString(HoodieMetricsDatadogConfig.API_KEY);
+      
     } else {
       Supplier<String> apiKeySupplier = ReflectionUtils.loadClass(
           getString(HoodieMetricsDatadogConfig.API_KEY_SUPPLIER));
@@ -2391,13 +2392,13 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withMergerImpls(String mergerImpls) {
-      writeConfig.setValue(MERGER_IMPLS, mergerImpls);
+    public Builder withRecordMergerImpls(String recordMergerImpls) {
+      writeConfig.setValue(RECORD_MERGER_IMPLS, recordMergerImpls);
       return this;
     }
 
-    public Builder withMergerStrategy(String mergerStrategy) {
-      writeConfig.setValue(MERGER_STRATEGY, mergerStrategy);
+    public Builder withRecordMergerStrategy(String recordMergerStrategy) {
+      writeConfig.setValue(RECORD_MERGER_STRATEGY, recordMergerStrategy);
       return this;
     }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 26c8ef32734..bb97b6e858e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -358,7 +358,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
     assertTrue(metadataWriter.isPresent());
     HoodieTableConfig hoodieTableConfig =
-        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), 
writeConfig.getPayloadClass(), 
writeConfig.getStringOrDefault(HoodieWriteConfig.MERGER_IMPLS));
+        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), 
writeConfig.getPayloadClass(), 
writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS));
     assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
 
     // Turn off metadata table
@@ -375,7 +375,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertFalse(metadataWriter2.isPresent());
 
     HoodieTableConfig hoodieTableConfig2 =
-        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), 
writeConfig2.getPayloadClass(), 
writeConfig.getStringOrDefault(HoodieWriteConfig.MERGER_IMPLS));
+        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), 
writeConfig2.getPayloadClass(), 
writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS));
     assertEquals(Collections.emptySet(), 
hoodieTableConfig2.getMetadataPartitions());
     // Assert metadata table folder is deleted
     assertFalse(metaClient.getFs().exists(
@@ -397,7 +397,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     validateMetadata(testTable, true);
     assertTrue(metadataWriter3.isPresent());
     HoodieTableConfig hoodieTableConfig3 =
-        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), 
writeConfig.getPayloadClass(), 
writeConfig.getStringOrDefault(HoodieWriteConfig.MERGER_IMPLS));
+        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), 
writeConfig.getPayloadClass(), 
writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS));
     assertFalse(hoodieTableConfig3.getMetadataPartitions().isEmpty());
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index b9e29787f8a..e49d560b74c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -98,7 +98,7 @@ public class HoodieAvroRecordMerger implements 
HoodieRecordMerger {
     }
 
     public static ConfigProperty<String> LEGACY_OPERATING_MODE =
-        ConfigProperty.key("hoodie.datasource.write.merger.legacy.operation")
+        
ConfigProperty.key("hoodie.datasource.write.record.merger.legacy.operation")
             .defaultValue(LegacyOperationMode.COMBINING.name())
             .withDocumentation("Controls the mode of the merging operation 
performed by `HoodieAvroRecordMerger`. "
                 + "This is required to maintain backward-compatibility w/ the 
existing semantic of `HoodieRecordPayload` "
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 549d5559c2f..90b10c60ed0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -173,10 +173,10 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Payload class to use for performing compactions, i.e 
merge delta logs with current base file and then "
           + " produce a new base file.");
 
-  public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
-      .key("hoodie.compaction.merger.strategy")
+  public static final ConfigProperty<String> RECORD_MERGER_STRATEGY = 
ConfigProperty
+      .key("hoodie.compaction.record.merger.strategy")
       .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
-      .withDocumentation("Id of merger strategy. Hudi will pick 
HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls 
which has the same merger strategy id");
+      .withDocumentation("Id of merger strategy. Hudi will pick 
HoodieRecordMerger implementations in 
hoodie.datasource.write.record.merger.impls which has the same merger strategy 
id");
 
   public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty
       .key("hoodie.archivelog.folder")
@@ -265,7 +265,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
   private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // 
<database_name>.<table_name>
 
-  public HoodieTableConfig(FileSystem fs, String metaPath, String 
payloadClassName, String mergerStrategyId) {
+  public HoodieTableConfig(FileSystem fs, String metaPath, String 
payloadClassName, String recordMergerStrategyId) {
     super();
     Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
     LOG.info("Loading table properties from " + propertyPath);
@@ -277,9 +277,9 @@ public class HoodieTableConfig extends HoodieConfig {
         setValue(PAYLOAD_CLASS_NAME, payloadClassName);
         needStore = true;
       }
-      if (contains(MERGER_STRATEGY) && payloadClassName != null
-          && !getString(MERGER_STRATEGY).equals(mergerStrategyId)) {
-        setValue(MERGER_STRATEGY, mergerStrategyId);
+      if (contains(RECORD_MERGER_STRATEGY) && payloadClassName != null
+          && 
!getString(RECORD_MERGER_STRATEGY).equals(recordMergerStrategyId)) {
+        setValue(RECORD_MERGER_STRATEGY, recordMergerStrategyId);
         needStore = true;
       }
       if (needStore) {
@@ -449,7 +449,7 @@ public class HoodieTableConfig extends HoodieConfig {
       hoodieConfig.setDefaultValue(TYPE);
       if 
(hoodieConfig.getString(TYPE).equals(HoodieTableType.MERGE_ON_READ.name())) {
         hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME);
-        hoodieConfig.setDefaultValue(MERGER_STRATEGY);
+        hoodieConfig.setDefaultValue(RECORD_MERGER_STRATEGY);
       }
       hoodieConfig.setDefaultValue(ARCHIVELOG_FOLDER);
       if (!hoodieConfig.contains(TIMELINE_LAYOUT_VERSION)) {
@@ -522,8 +522,8 @@ public class HoodieTableConfig extends HoodieConfig {
   /**
    * Read the payload class for HoodieRecords from the table properties.
    */
-  public String getMergerStrategy() {
-    return getStringOrDefault(MERGER_STRATEGY);
+  public String getRecordMergerStrategy() {
+    return getStringOrDefault(RECORD_MERGER_STRATEGY);
   }
 
   public String getPreCombineField() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 1ff0a589af9..672d057f1e1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -126,7 +126,7 @@ public class HoodieTableMetaClient implements Serializable {
    */
   protected HoodieTableMetaClient(Configuration conf, String basePath, boolean 
loadActiveTimelineOnLoad,
                                 ConsistencyGuardConfig consistencyGuardConfig, 
Option<TimelineLayoutVersion> layoutVersion,
-                                String payloadClassName, String 
mergerStrategy, FileSystemRetryConfig fileSystemRetryConfig) {
+                                String payloadClassName, String 
recordMergerStrategy, FileSystemRetryConfig fileSystemRetryConfig) {
     LOG.info("Loading HoodieTableMetaClient from " + basePath);
     this.consistencyGuardConfig = consistencyGuardConfig;
     this.fileSystemRetryConfig = fileSystemRetryConfig;
@@ -135,7 +135,7 @@ public class HoodieTableMetaClient implements Serializable {
     this.metaPath = new SerializablePath(new CachingPath(basePath, 
METAFOLDER_NAME));
     this.fs = getFs();
     TableNotFoundException.checkTableValidity(fs, this.basePath.get(), 
metaPath.get());
-    this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), 
payloadClassName, mergerStrategy);
+    this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), 
payloadClassName, recordMergerStrategy);
     this.tableType = tableConfig.getTableType();
     Option<TimelineLayoutVersion> tableConfigVersion = 
tableConfig.getTimelineLayoutVersion();
     if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
@@ -170,7 +170,7 @@ public class HoodieTableMetaClient implements Serializable {
         .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig)
         .setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion))
         .setPayloadClassName(null)
-        .setMergerStrategy(null)
+        .setRecordMergerStrategy(null)
         .setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build();
   }
 
@@ -680,17 +680,17 @@ public class HoodieTableMetaClient implements 
Serializable {
 
   private static HoodieTableMetaClient newMetaClient(Configuration conf, 
String basePath, boolean loadActiveTimelineOnLoad,
       ConsistencyGuardConfig consistencyGuardConfig, 
Option<TimelineLayoutVersion> layoutVersion,
-      String payloadClassName, String mergerStrategy, FileSystemRetryConfig 
fileSystemRetryConfig, Properties props) {
+      String payloadClassName, String recordMergerStrategy, 
FileSystemRetryConfig fileSystemRetryConfig, Properties props) {
     HoodieMetaserverConfig metaserverConfig = null == props
         ? new HoodieMetaserverConfig.Builder().build()
         : new HoodieMetaserverConfig.Builder().fromProperties(props).build();
     return metaserverConfig.isMetaserverEnabled()
         ? (HoodieTableMetaClient) 
ReflectionUtils.loadClass("org.apache.hudi.common.table.HoodieTableMetaserverClient",
         new Class<?>[] {Configuration.class, ConsistencyGuardConfig.class, 
String.class, FileSystemRetryConfig.class, String.class, String.class, 
HoodieMetaserverConfig.class},
-        conf, consistencyGuardConfig, mergerStrategy, fileSystemRetryConfig,
+        conf, consistencyGuardConfig, recordMergerStrategy, 
fileSystemRetryConfig,
         props.getProperty(HoodieTableConfig.DATABASE_NAME.key()), 
props.getProperty(HoodieTableConfig.NAME.key()), metaserverConfig)
         : new HoodieTableMetaClient(conf, basePath,
-        loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, 
payloadClassName, mergerStrategy, fileSystemRetryConfig);
+        loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, 
payloadClassName, recordMergerStrategy, fileSystemRetryConfig);
   }
 
   public static Builder builder() {
@@ -706,7 +706,7 @@ public class HoodieTableMetaClient implements Serializable {
     private String basePath;
     private boolean loadActiveTimelineOnLoad = false;
     private String payloadClassName = null;
-    private String mergerStrategy = null;
+    private String recordMergerStrategy = null;
     private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
     private FileSystemRetryConfig fileSystemRetryConfig = 
FileSystemRetryConfig.newBuilder().build();
     private Option<TimelineLayoutVersion> layoutVersion = 
Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
@@ -732,8 +732,8 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
-    public Builder setMergerStrategy(String mergerStrategy) {
-      this.mergerStrategy = mergerStrategy;
+    public Builder setRecordMergerStrategy(String recordMergerStrategy) {
+      this.recordMergerStrategy = recordMergerStrategy;
       return this;
     }
 
@@ -762,7 +762,7 @@ public class HoodieTableMetaClient implements Serializable {
       ValidationUtils.checkArgument(basePath != null, "basePath needs to be 
set to init HoodieTableMetaClient");
       return newMetaClient(conf, basePath,
           loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, 
payloadClassName,
-          mergerStrategy, fileSystemRetryConfig, props);
+          recordMergerStrategy, fileSystemRetryConfig, props);
     }
   }
 
@@ -782,7 +782,7 @@ public class HoodieTableMetaClient implements Serializable {
     private String recordKeyFields;
     private String archiveLogFolder;
     private String payloadClassName;
-    private String mergerStrategy;
+    private String recordMergerStrategy;
     private Integer timelineLayoutVersion;
     private String baseFileFormat;
     private String preCombineField;
@@ -852,8 +852,8 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
-    public PropertyBuilder setMergerStrategy(String mergerStrategy) {
-      this.mergerStrategy = mergerStrategy;
+    public PropertyBuilder setRecordMergerStrategy(String 
recordMergerStrategy) {
+      this.recordMergerStrategy = recordMergerStrategy;
       return this;
     }
 
@@ -977,7 +977,7 @@ public class HoodieTableMetaClient implements Serializable {
           .setTableName(metaClient.getTableConfig().getTableName())
           .setArchiveLogFolder(metaClient.getArchivePath())
           .setPayloadClassName(metaClient.getTableConfig().getPayloadClass())
-          .setMergerStrategy(metaClient.getTableConfig().getMergerStrategy());
+          
.setRecordMergerStrategy(metaClient.getTableConfig().getRecordMergerStrategy());
     }
 
     public PropertyBuilder fromProperties(Properties properties) {
@@ -1007,9 +1007,9 @@ public class HoodieTableMetaClient implements 
Serializable {
         setPayloadClassName(
             hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
       }
-      if (hoodieConfig.contains(HoodieTableConfig.MERGER_STRATEGY)) {
-        setMergerStrategy(
-            hoodieConfig.getString(HoodieTableConfig.MERGER_STRATEGY));
+      if (hoodieConfig.contains(HoodieTableConfig.RECORD_MERGER_STRATEGY)) {
+        setRecordMergerStrategy(
+            hoodieConfig.getString(HoodieTableConfig.RECORD_MERGER_STRATEGY));
       }
       if (hoodieConfig.contains(HoodieTableConfig.TIMELINE_LAYOUT_VERSION)) {
         
setTimelineLayoutVersion(hoodieConfig.getInt(HoodieTableConfig.TIMELINE_LAYOUT_VERSION));
@@ -1097,8 +1097,8 @@ public class HoodieTableMetaClient implements 
Serializable {
       if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != 
null) {
         tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, 
payloadClassName);
       }
-      if (tableType == HoodieTableType.MERGE_ON_READ && mergerStrategy != 
null) {
-        tableConfig.setValue(HoodieTableConfig.MERGER_STRATEGY, 
mergerStrategy);
+      if (tableType == HoodieTableType.MERGE_ON_READ && recordMergerStrategy 
!= null) {
+        tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, 
recordMergerStrategy);
       }
 
       if (null != tableCreateSchema) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 666a084877b..31f9e226c00 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -67,7 +67,7 @@ public class HoodieRecordUtils {
    * Instantiate a given class with a record merge.
    */
   public static HoodieRecordMerger createRecordMerger(String basePath, 
EngineType engineType,
-      List<String> mergerClassList, String mergerStrategy) {
+      List<String> mergerClassList, String recordMergerStrategy) {
     if (mergerClassList.isEmpty() || 
HoodieTableMetadata.isMetadataTable(basePath)) {
       return 
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
     } else {
@@ -81,7 +81,7 @@ public class HoodieRecordUtils {
             }
           })
           .filter(Objects::nonNull)
-          .filter(merger -> merger.getMergingStrategy().equals(mergerStrategy))
+          .filter(merger -> 
merger.getMergingStrategy().equals(recordMergerStrategy))
           .filter(merger -> recordTypeCompatibleEngine(merger.getRecordType(), 
engineType))
           .findFirst()
           
.orElse(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 9454fd9e77e..e110cb6ba90 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -121,13 +121,14 @@ public class FlinkStreamerConfig extends Configuration {
       + "a GenericRecord. Implement your own, if you want to do something 
other than overwriting existing value.")
   public String payloadClassName = 
OverwriteWithLatestAvroPayload.class.getName();
 
-  @Parameter(names = {"--merger-impls"}, description = "List of HoodieMerger 
implementations constituting Hudi's merging strategy -- based on the engine 
used. "
-      + "These merger impls will filter by merger-strategy "
+  @Parameter(names = {"--record-merger-impls"}, description = "List of 
HoodieMerger implementations constituting Hudi's record merging strategy -- 
based on the engine used. "
+      + "These record merger impls will filter by record-merger-strategy "
       + "Hudi will pick most efficient implementation to perform 
merging/combining of the records (during update, reading MOR table, etc)")
-  public String mergerImpls = HoodieAvroRecordMerger.class.getName();
+  public String recordMergerImpls = HoodieAvroRecordMerger.class.getName();
 
-  @Parameter(names = {"--merger-strategy"}, description = "Id of merger 
strategy. Hudi will pick HoodieRecordMerger implementations in merger-impls 
which has the same merger strategy id")
-  public String mergerStrategy = 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+  @Parameter(names = {"--record-merger-strategy"}, description = "Id of record 
merger strategy. Hudi will pick HoodieRecordMerger implementations in 
record-merger-impls "
+      + "which has the same record merger strategy id")
+  public String recordMergerStrategy = 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
 
   @Parameter(names = {"--op"}, description = "Takes one of these values : 
UPSERT (default), INSERT (use when input "
       + "is purely new data/inserts to gain speed).", converter = 
OperationConverter.class)
@@ -407,8 +408,8 @@ public class FlinkStreamerConfig extends Configuration {
     conf.setString(FlinkOptions.OPERATION, config.operation.value());
     conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
     conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName);
-    conf.setString(FlinkOptions.RECORD_MERGER_IMPLS, config.mergerImpls);
-    conf.setString(FlinkOptions.RECORD_MERGER_STRATEGY, config.mergerStrategy);
+    conf.setString(FlinkOptions.RECORD_MERGER_IMPLS, config.recordMergerImpls);
+    conf.setString(FlinkOptions.RECORD_MERGER_STRATEGY, 
config.recordMergerStrategy);
     conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine);
     conf.setInteger(FlinkOptions.RETRY_TIMES, 
Integer.parseInt(config.instantRetryTimes));
     conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, 
Long.parseLong(config.instantRetryInterval));
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
index a9cf806b19d..561851c8e2b 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
@@ -160,7 +160,7 @@ public class TestInputPathHandler {
     properties.setProperty(HoodieTableConfig.NAME.key(), tableName);
     properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
     properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
HoodieAvroPayload.class.getName());
-    properties.setProperty(HoodieTableConfig.MERGER_STRATEGY.key(), 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
+    properties.setProperty(HoodieTableConfig.RECORD_MERGER_STRATEGY.key(), 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
     return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath, properties);
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 32c58665572..5de9cc8a411 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -344,12 +344,12 @@ object DataSourceWriteOptions {
    * HoodieMerger will replace the payload to process the merge of data
    * and provide the same capabilities as the payload
    */
-  val MERGER_IMPLS = HoodieWriteConfig.MERGER_IMPLS
+  val RECORD_MERGER_IMPLS = HoodieWriteConfig.RECORD_MERGER_IMPLS
 
   /**
    * Id of merger strategy
    */
-  val MERGER_STRATEGY = HoodieWriteConfig.MERGER_STRATEGY
+  val RECORD_MERGER_STRATEGY = HoodieWriteConfig.RECORD_MERGER_STRATEGY
 
   /**
    * Record key field. Value to be used as the `recordKey` component of 
`HoodieKey`. Actual value
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 9f30db6158f..80914337eec 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -78,8 +78,8 @@ case class HoodieTableState(tablePath: String,
                             usesVirtualKeys: Boolean,
                             recordPayloadClassName: String,
                             metadataConfig: HoodieMetadataConfig,
-                            mergerImpls: List[String],
-                            mergerStrategy: String)
+                            recordMergerImpls: List[String],
+                            recordMergerStrategy: String)
 
 /**
  * Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -460,10 +460,10 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   }
 
   protected def getTableState: HoodieTableState = {
-    val mergerImpls = 
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.MERGER_IMPLS)).asScala.toList
+    val recordMergerImpls = 
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
 
-    val mergerStrategy = getConfigValue(HoodieWriteConfig.MERGER_STRATEGY,
-      Option(metaClient.getTableConfig.getMergerStrategy))
+    val recordMergerStrategy = 
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
+      Option(metaClient.getTableConfig.getRecordMergerStrategy))
 
     // Subset of the state of table's configuration as of at the time of the 
query
     HoodieTableState(
@@ -474,8 +474,8 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
       usesVirtualKeys = !tableConfig.populateMetaFields(),
       recordPayloadClassName = tableConfig.getPayloadClass,
       metadataConfig = fileIndex.metadataConfig,
-      mergerImpls = mergerImpls,
-      mergerStrategy = mergerStrategy
+      recordMergerImpls = recordMergerImpls,
+      recordMergerStrategy = recordMergerStrategy
     )
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index d0e270ed136..90535348ae6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.hudi
 
 import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
-import org.apache.hudi.DataSourceWriteOptions.{MERGER_IMPLS, _}
+import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _}
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, HoodieConfig}
 import org.apache.hudi.common.table.HoodieTableConfig
@@ -224,7 +224,7 @@ object HoodieWriterUtils {
     PARTITIONPATH_FIELD -> HoodieTableConfig.PARTITION_FIELDS,
     RECORDKEY_FIELD -> HoodieTableConfig.RECORDKEY_FIELDS,
     PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME,
-    MERGER_STRATEGY -> HoodieTableConfig.MERGER_STRATEGY
+    RECORD_MERGER_STRATEGY -> HoodieTableConfig.RECORD_MERGER_STRATEGY
   )
   def mappingSparkDatasourceConfigsToTableConfigs(options: Map[String, 
String]): Map[String, String] = {
     val includingTableConfigs = scala.collection.mutable.Map() ++ options
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 7e18f7a9746..d81745156a6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -195,7 +195,7 @@ class RecordMergingFileIterator(split: 
HoodieMergeOnReadFileSplit,
 
   private val baseFileIterator = baseFileReader(split.dataFile.get)
 
-  private val recordMerger = 
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, 
tableState.mergerImpls.asJava, tableState.mergerStrategy)
+  private val recordMerger = 
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, 
tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy)
 
   override def doHasNext: Boolean = hasNextInternal
 
@@ -316,7 +316,7 @@ object LogFileIterator {
           getRelativePartitionPath(new Path(tableState.tablePath), 
logFiles.head.getPath.getParent))
       }
 
-      val recordMerger = 
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, 
tableState.mergerImpls.asJava, tableState.mergerStrategy)
+      val recordMerger = 
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, 
tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy)
       logRecordScannerBuilder.withRecordMerger(recordMerger)
 
       logRecordScannerBuilder.build()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 9df5586b3d9..17a1cab27fe 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -161,8 +161,8 @@ class HoodieCDCRDD(
         metaClient.getTableConfig.getPayloadClass,
         metadataConfig,
         // TODO support CDC with spark record
-        mergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
-        mergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
+        recordMergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
+        recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
       )
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 7219af6c4af..7da69b2c0bb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -69,10 +69,10 @@ object HoodieOptionConfig {
     .defaultValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue())
     .build()
 
-  val SQL_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf()
-    .withSqlKey("mergerStrategy")
-    .withHoodieKey(DataSourceWriteOptions.MERGER_STRATEGY.key)
-    .withTableConfigKey(HoodieTableConfig.MERGER_STRATEGY.key)
+  val SQL_RECORD_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf()
+    .withSqlKey("recordMergerStrategy")
+    .withHoodieKey(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key)
+    .withTableConfigKey(HoodieTableConfig.RECORD_MERGER_STRATEGY.key)
     .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
     .build()
 
@@ -197,7 +197,7 @@ object HoodieOptionConfig {
   // extract primaryKey, preCombineField, type options
   def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
     val sqlOptions = mapTableConfigsToSqlOptions(options)
-    val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- 
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
+    val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- 
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_RECORD_MERGER_STRATEGY.sqlKeyName)
     sqlOptions.filterKeys(targetOptions.contains)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index db2b93eda08..bf6b6509b95 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -66,7 +66,7 @@ trait ProvidesHoodieConfig extends Logging {
         RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
         TBL_NAME.key -> hoodieCatalogTable.tableName,
         PRECOMBINE_FIELD.key -> preCombineField,
-        MERGER_IMPLS.key -> 
hoodieProps.getString(HoodieWriteConfig.MERGER_IMPLS.key, 
HoodieWriteConfig.MERGER_IMPLS.defaultValue),
+        RECORD_MERGER_IMPLS.key -> 
hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, 
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue),
         HIVE_STYLE_PARTITIONING.key -> 
tableConfig.getHiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> 
classOf[SqlKeyGenerator].getCanonicalName,
@@ -193,7 +193,7 @@ trait ProvidesHoodieConfig extends Logging {
         PRECOMBINE_FIELD.key -> preCombineField,
         PARTITIONPATH_FIELD.key -> partitionFieldsStr,
         PAYLOAD_CLASS_NAME.key -> payloadClassName,
-        MERGER_IMPLS.key -> 
hoodieProps.getString(HoodieWriteConfig.MERGER_IMPLS.key, 
HoodieWriteConfig.MERGER_IMPLS.defaultValue),
+        RECORD_MERGER_IMPLS.key -> 
hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, 
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue),
         ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
         HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> 
String.valueOf(hasPrecombineColumn),
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index d4cba17f4d4..7befb97c7b8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -189,7 +189,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // Create the write parameters
     val parameters = buildMergeIntoConfig(hoodieCatalogTable)
     // TODO Remove it when we implement ExpressionPayload for SparkRecord
-    val parametersWithAvroRecordMerger = parameters ++ 
Map(HoodieWriteConfig.MERGER_IMPLS.key -> 
classOf[HoodieAvroRecordMerger].getName)
+    val parametersWithAvroRecordMerger = parameters ++ 
Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[HoodieAvroRecordMerger].getName)
     executeUpsert(sourceDF, parametersWithAvroRecordMerger)
 
     sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 21b6a34c94f..ea34123be43 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -73,7 +73,7 @@ class TestCOWDataSource extends HoodieClientTestBase with 
ScalaAssertionSupport
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
     HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
   )
-  val sparkOpts = Map(HoodieWriteConfig.MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName)
+  val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName)
 
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index e6e850d6212..184cb97f2be 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -68,7 +68,7 @@ class TestMORDataSource extends HoodieClientTestBase with 
SparkDatasetMixin {
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
   )
   val sparkOpts = Map(
-    HoodieWriteConfig.MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName,
+    HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName,
     HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
index 3547e42148d..582a6b4cf20 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
@@ -82,7 +82,7 @@ object ReadAndWriteWithoutAvroBenchmark extends 
HoodieBenchmarkBase {
     if (spark.catalog.tableExists(tableName)) {
       spark.sql(s"drop table if exists $tableName")
     }
-    spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = $mergerImpl")
+    spark.sql(s"set ${HoodieWriteConfig.RECORD_MERGER_IMPLS.key} = 
$mergerImpl")
     spark.sql(
       s"""
          |create table $tableName(
@@ -157,7 +157,7 @@ object ReadAndWriteWithoutAvroBenchmark extends 
HoodieBenchmarkBase {
         prepareHoodieTable(sparkTable, new Path(sparkPath.getCanonicalPath, 
sparkTable).toUri.toString, "mor", sparkMergerImpl, df)
         Seq(avroMergerImpl, sparkMergerImpl).zip(Seq(avroTable, 
sparkTable)).foreach {
           case (mergerImpl, tableName) => upsertBenchmark.addCase(mergerImpl) 
{ _ =>
-            spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = 
$mergerImpl")
+            spark.sql(s"set ${HoodieWriteConfig.RECORD_MERGER_IMPLS.key} = 
$mergerImpl")
             spark.sql(s"update $tableName set s1 = 's1_new_1' where id > 0")
           }
         }
@@ -166,7 +166,7 @@ object ReadAndWriteWithoutAvroBenchmark extends 
HoodieBenchmarkBase {
         val readBenchmark = new HoodieBenchmark("pref read", 10000, 3)
         Seq(avroMergerImpl, sparkMergerImpl).zip(Seq(avroTable, 
sparkTable)).foreach {
           case (mergerImpl, tableName) => readBenchmark.addCase(mergerImpl) { 
_ =>
-            spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = 
$mergerImpl")
+            spark.sql(s"set ${HoodieWriteConfig.RECORD_MERGER_IMPLS.key} = 
$mergerImpl")
             spark.sql(s"select * from $tableName").collect()
           }
         }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index a185b3849bd..cf37f7436fd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -200,7 +200,7 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
         case _ => (classOf[HoodieAvroRecordMerger].getName, "avro")
       }
       val config = Map(
-        HoodieWriteConfig.MERGER_IMPLS.key -> merger,
+        HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> merger,
         HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> format) ++ 
recordConfig.getOrElse(recordType, Map.empty)
       withSQLConf(config.toList:_*) {
         f
@@ -211,7 +211,7 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
   }
 
   protected def getRecordType(): HoodieRecordType = {
-    val merger = 
spark.sessionState.conf.getConfString(HoodieWriteConfig.MERGER_IMPLS.key, 
HoodieWriteConfig.MERGER_IMPLS.defaultValue())
+    val merger = 
spark.sessionState.conf.getConfString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key,
 HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())
     if (merger.equals(classOf[HoodieSparkRecordMerger].getName)) {
       HoodieRecordType.SPARK
     } else {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 2cb9c98878b..4e726317844 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -36,13 +36,13 @@ class TestHoodieOptionConfig extends 
SparkClientFunctionalTestHarness {
     assertTrue(with1("primaryKey") == "id")
     assertTrue(with1("type") == "cow")
     assertTrue(with1("payloadClass") == 
classOf[OverwriteWithLatestAvroPayload].getName)
-    assertTrue(with1("mergerStrategy") == 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+    assertTrue(with1("recordMergerStrategy") == 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
 
     val ops2 = Map("primaryKey" -> "id",
       "preCombineField" -> "timestamp",
       "type" -> "mor",
       "payloadClass" -> classOf[DefaultHoodieRecordPayload].getName,
-      "mergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
+      "recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
     )
     val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
     assertTrue(ops2 == with2)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index b6444b52b52..868a96ebc2c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -231,7 +231,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
    withRecordType(Map(HoodieRecordType.SPARK ->
      // SparkMerger should use "HoodieSparkValidateDuplicateKeyRecordMerger"
      // with "hoodie.sql.insert.mode=strict"
-     Map(HoodieWriteConfig.MERGER_IMPLS.key ->
+     Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
        
classOf[HoodieSparkValidateDuplicateKeyRecordMerger].getName)))(withTempDir { 
tmp =>
      val tableName = generateTableName
      spark.sql(s"set hoodie.sql.insert.mode=strict")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 14d6d9afaee..cd46e9f3789 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -630,7 +630,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   }
 
   val sparkOpts = Map(
-    HoodieWriteConfig.MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName,
+    HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName,
     HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
index 72bd71e4bba..dbf6d173865 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
@@ -86,7 +86,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with 
BeforeAndAfter {
         new Path(tablePath).getFileSystem(new Configuration),
         s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME,
         HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue,
-        HoodieTableConfig.MERGER_STRATEGY.defaultValue).getTableType)
+        HoodieTableConfig.RECORD_MERGER_STRATEGY.defaultValue).getTableType)
 
       // Manually pass incremental configs to global configs to make sure Hudi 
query is able to load the
       // global configs
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 10dbf998a5f..63cf706c174 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -281,7 +281,7 @@ public class DeltaSync implements Serializable, Closeable {
             .setConf(new Configuration(fs.getConf()))
             .setBasePath(cfg.targetBasePath)
             .setPayloadClassName(cfg.payloadClassName)
-            
.setMergerStrategy(props.getProperty(HoodieWriteConfig.MERGER_STRATEGY.key(), 
HoodieWriteConfig.MERGER_STRATEGY.defaultValue()))
+            
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
 HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
             .build();
         switch (meta.getTableType()) {
           case COPY_ON_WRITE:
@@ -474,8 +474,8 @@ public class DeltaSync implements Serializable, Closeable {
 
   private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchFromSource(Option<String> resumeCheckpointStr) {
     HoodieRecordType recordType = HoodieRecordUtils.createRecordMerger(null, 
EngineType.SPARK,
-        
ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.MERGER_IMPLS.key(), 
HoodieWriteConfig.MERGER_IMPLS.defaultValue())),
-        props.getProperty(HoodieWriteConfig.MERGER_STRATEGY.key(), 
HoodieWriteConfig.MERGER_STRATEGY.defaultValue())).getRecordType();
+        
ConfigUtils.split2List(props.getProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(),
 HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue())),
+        props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), 
HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue())).getRecordType();
     if (recordType == HoodieRecordType.SPARK && 
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
         && 
HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
 "avro"))
         != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 60b870b6549..ffa2a812f6f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -272,7 +272,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     static void addRecordMerger(HoodieRecordType type, List<String> 
hoodieConfig) {
       if (type == HoodieRecordType.SPARK) {
-        hoodieConfig.add(String.format("%s=%s", 
HoodieWriteConfig.MERGER_IMPLS.key(), HoodieSparkRecordMerger.class.getName()));
+        hoodieConfig.add(String.format("%s=%s", 
HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), 
HoodieSparkRecordMerger.class.getName()));
         hoodieConfig.add(String.format("%s=%s", 
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),"parquet"));
       }
     }
diff --git a/rfc/rfc-46/rfc-46.md b/rfc/rfc-46/rfc-46.md
index c16ff178f77..d7d08f4a7ae 100644
--- a/rfc/rfc-46/rfc-46.md
+++ b/rfc/rfc-46/rfc-46.md
@@ -148,7 +148,7 @@ class HoodieFlinkRecordMerger implements HoodieRecordMerger 
{
 Where user can provide their own subclass implementing such interface for the 
engines of interest.
 
 ### Merging Strategy
-The RecordMerger is engine-aware. We provide a config called 
HoodieWriteConfig.MERGER_IMPLS. You can set a list of RecordMerger class name 
to it. And you can set HoodieWriteConfig.MERGER_STRATEGY which is UUID of 
RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same 
MERGER_STRATEGY according to the engine type at runtime.
+The RecordMerger is engine-aware. We provide a config called 
HoodieWriteConfig.RECORD_MERGER_IMPLS. You can set a list of RecordMerger class 
name to it. And you can set HoodieWriteConfig.RECORD_MERGER_STRATEGY which is 
UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has 
the same MERGER_STRATEGY according to the engine type at runtime.
 - Every RecordMerger implementation being engine-specific (referred to as 
"implementation"), implements particular merging semantic (referred to as 
"merging strategy")
 - Such tiering allowing us to be flexible in terms of providing 
implementations for the merging strategy only for engines you might be 
interested in
 - Merging strategy is a table property that is set once during init

Reply via email to