This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 544f9e5a2f6 [HUDI-6780] Introduce enums instead of classnames in table 
properties (#9590)
544f9e5a2f6 is described below

commit 544f9e5a2f63a343ec8c56b2b1e8bcd2b42aaa33
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Thu Sep 14 20:29:44 2023 +0530

    [HUDI-6780] Introduce enums instead of classnames in table properties 
(#9590)
    
    There are three configs in the `hoodie.properties` that hold classnames;
    `KEY_GENERATOR_CLASS_NAME`,
    `BOOTSTRAP_INDEX_CLASS_NAME`,
    `PAYLOAD_CLASS_NAME`.
    This PR adds enums for them and sets the enum in table config while
    inferring the classname from the enum in the getters.
---
 .../apache/hudi/config/HoodieBootstrapConfig.java  |   5 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  11 +-
 .../common/bootstrap/index/BootstrapIndex.java     |   6 +-
 .../hudi/common/model/BootstrapIndexType.java      |  76 ++++++++++++++
 .../hudi/common/model/RecordPayloadType.java       | 112 +++++++++++++++++++++
 .../hudi/common/table/HoodieTableConfig.java       |  63 +++++++++---
 .../hudi/common/table/HoodieTableMetaClient.java   |  54 +++++++---
 .../org/apache/hudi/common/util/ConfigUtils.java   |  10 +-
 .../hudi/keygen/constant/KeyGeneratorType.java     |  83 +++++++++++++--
 .../org/apache/hudi/table/HoodieTableFactory.java  |   4 +
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  26 ++---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  26 +++--
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   8 +-
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |   6 +-
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |  11 +-
 .../apache/hudi/cli/BootstrapExecutorUtils.java    |   8 +-
 .../apache/spark/sql/hudi/TestCreateTable.scala    |   7 +-
 .../spark/sql/hudi/TestHoodieOptionConfig.scala    |   7 +-
 .../sql/hudi/procedure/TestRepairsProcedure.scala  |   2 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  37 ++-----
 .../TestHoodieMultiTableDeltaStreamer.java         |   2 +-
 .../short_trip_uber_config.properties              |   2 +-
 22 files changed, 451 insertions(+), 115 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
index d88f0bb2e6f..297ad381907 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.model.BootstrapIndexType;
 
 import java.io.File;
 import java.io.FileReader;
@@ -250,8 +250,7 @@ public class HoodieBootstrapConfig extends HoodieConfig {
 
     public HoodieBootstrapConfig build() {
       // TODO: use infer function instead
-      bootstrapConfig.setDefaultValue(INDEX_CLASS_NAME, 
HoodieTableConfig.getDefaultBootstrapIndexClass(
-          bootstrapConfig.getProps()));
+      bootstrapConfig.setDefaultValue(INDEX_CLASS_NAME, 
BootstrapIndexType.getDefaultBootstrapIndexClassName(bootstrapConfig));
       bootstrapConfig.setDefaults(HoodieBootstrapConfig.class.getName());
       return bootstrapConfig;
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5f83a67486a..82c0eff1610 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.RecordPayloadType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -151,6 +152,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
 
+  public static final ConfigProperty<String> WRITE_PAYLOAD_TYPE = 
ConfigProperty
+      .key("hoodie.datasource.write.payload.type")
+      .defaultValue(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())
+      .markAdvanced()
+      .withDocumentation(RecordPayloadType.class);
+
   public static final ConfigProperty<String> RECORD_MERGER_IMPLS = 
ConfigProperty
       .key("hoodie.datasource.write.record.merger.impls")
       .defaultValue(HoodieAvroRecordMerger.class.getName())
@@ -1242,10 +1249,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(PRECOMBINE_FIELD_NAME);
   }
 
-  public String getWritePayloadClass() {
-    return getString(WRITE_PAYLOAD_CLASS_NAME);
-  }
-
   public String getKeyGeneratorClass() {
     return getString(KEYGENERATOR_CLASS_NAME);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
index abd3ac51a20..80569a9f1f6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.bootstrap.index;
 
 import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.model.BootstrapIndexType;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -160,7 +161,8 @@ public abstract class BootstrapIndex implements 
Serializable {
   }
 
   public static BootstrapIndex getBootstrapIndex(HoodieTableMetaClient 
metaClient) {
-    return ((BootstrapIndex)(ReflectionUtils.loadClass(
-        metaClient.getTableConfig().getBootstrapIndexClass(), new 
Class[]{HoodieTableMetaClient.class}, metaClient)));
+    return ((BootstrapIndex) (ReflectionUtils.loadClass(
+        
BootstrapIndexType.getBootstrapIndexClassName(metaClient.getTableConfig()),
+        new Class[] {HoodieTableMetaClient.class}, metaClient)));
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapIndexType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapIndexType.java
new file mode 100644
index 00000000000..c2233f39cea
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BootstrapIndexType.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
+import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex;
+import org.apache.hudi.common.config.EnumDescription;
+import org.apache.hudi.common.config.EnumFieldDescription;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import static 
org.apache.hudi.common.table.HoodieTableConfig.BOOTSTRAP_INDEX_CLASS_NAME;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.BOOTSTRAP_INDEX_TYPE;
+
+@EnumDescription("Bootstrap index type to use for mapping between skeleton and 
actual data files.")
+public enum BootstrapIndexType {
+  @EnumFieldDescription("Maintains mapping in HFile format.")
+  HFILE(HFileBootstrapIndex.class.getName()),
+  @EnumFieldDescription("No-op, an empty implementation.")
+  NO_OP(NoOpBootstrapIndex.class.getName());
+
+  private final String className;
+
+  BootstrapIndexType(String className) {
+    this.className = className;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public static BootstrapIndexType fromClassName(String className) {
+    for (BootstrapIndexType type : BootstrapIndexType.values()) {
+      if (type.getClassName().equals(className)) {
+        return type;
+      }
+    }
+    throw new IllegalArgumentException("No BootstrapIndexType found for class 
name: " + className);
+  }
+
+  public static String getBootstrapIndexClassName(HoodieConfig config) {
+    if (!config.getBooleanOrDefault(BOOTSTRAP_INDEX_ENABLE)) {
+      return BootstrapIndexType.NO_OP.getClassName();
+    }
+    if (config.contains(BOOTSTRAP_INDEX_CLASS_NAME)) {
+      return config.getString(BOOTSTRAP_INDEX_CLASS_NAME);
+    } else if (config.contains(BOOTSTRAP_INDEX_TYPE)) {
+      return 
BootstrapIndexType.valueOf(config.getString(BOOTSTRAP_INDEX_TYPE)).getClassName();
+    }
+    return getDefaultBootstrapIndexClassName(config);
+  }
+
+  public static String getDefaultBootstrapIndexClassName(HoodieConfig config) {
+    if (!config.getBooleanOrDefault(BOOTSTRAP_INDEX_ENABLE)) {
+      return BootstrapIndexType.NO_OP.getClassName();
+    }
+    return 
BootstrapIndexType.valueOf(BOOTSTRAP_INDEX_TYPE.defaultValue()).getClassName();
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java
new file mode 100644
index 00000000000..d1eae004dc5
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/RecordPayloadType.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.EnumDescription;
+import org.apache.hudi.common.config.EnumFieldDescription;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_TYPE;
+
+/**
+ * Payload to use for record.
+ */
+@EnumDescription("Payload to use for merging records")
+public enum RecordPayloadType {
+  @EnumFieldDescription("Provides support for seamlessly applying changes 
captured via Amazon Database Migration Service onto S3.")
+  AWS_DMS_AVRO(AWSDmsAvroPayload.class.getName()),
+
+  @EnumFieldDescription("A payload to wrap a existing Hoodie Avro Record. 
Useful to create a HoodieRecord over existing GenericRecords.")
+  HOODIE_AVRO(HoodieAvroPayload.class.getName()),
+
+  @EnumFieldDescription("Honors ordering field in both preCombine and 
combineAndGetUpdateValue.")
+  HOODIE_AVRO_DEFAULT(DefaultHoodieRecordPayload.class.getName()),
+
+  @EnumFieldDescription("The only difference with HOODIE_AVRO_DEFAULT is that 
this does not track the event time metadata for efficiency")
+  EVENT_TIME_AVRO(EventTimeAvroPayload.class.getName()),
+
+  @EnumFieldDescription("Subclass of OVERWRITE_LATEST_AVRO used for delta 
streamer.")
+  
OVERWRITE_NON_DEF_LATEST_AVRO(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()),
+
+  @EnumFieldDescription("Default payload used for delta streamer.")
+  OVERWRITE_LATEST_AVRO(OverwriteWithLatestAvroPayload.class.getName()),
+
+  @EnumFieldDescription("Used for partial update to Hudi Table.")
+  PARTIAL_UPDATE_AVRO(PartialUpdateAvroPayload.class.getName()),
+
+  @EnumFieldDescription("Provides support for seamlessly applying changes 
captured via Debezium for MysqlDB.")
+  MYSQL_DEBEZIUM_AVRO(MySqlDebeziumAvroPayload.class.getName()),
+
+  @EnumFieldDescription("Provides support for seamlessly applying changes 
captured via Debezium for PostgresDB.")
+  POSTGRES_DEBEZIUM_AVRO(PostgresDebeziumAvroPayload.class.getName()),
+
+  @EnumFieldDescription("A record payload Hudi's internal metadata table.")
+  HOODIE_METADATA(HoodieMetadataPayload.class.getName()),
+
+  @EnumFieldDescription("A record payload to validate the duplicate key for 
INSERT statement in spark-sql.")
+  
VALIDATE_DUPLICATE_AVRO("org.apache.spark.sql.hudi.command.ValidateDuplicateKeyPayload"),
+
+  @EnumFieldDescription("A record payload for MERGE INTO statement in 
spark-sql.")
+  
EXPRESSION_AVRO("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"),
+
+  @EnumFieldDescription("Use the payload class set in 
`hoodie.datasource.write.payload.class`")
+  CUSTOM("");
+
+  private String className;
+
+  RecordPayloadType(String className) {
+    this.className = className;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public static RecordPayloadType fromClassName(String className) {
+    for (RecordPayloadType type : RecordPayloadType.values()) {
+      if (type.getClassName().equals(className)) {
+        return type;
+      }
+    }
+    // No RecordPayloadType found for class name, return CUSTOM
+    CUSTOM.className = className;
+    return CUSTOM;
+  }
+
+  public static String getPayloadClassName(HoodieConfig config) {
+    String payloadClassName;
+    if (config.contains(PAYLOAD_CLASS_NAME)) {
+      payloadClassName = config.getString(PAYLOAD_CLASS_NAME);
+    } else if (config.contains(PAYLOAD_TYPE)) {
+      payloadClassName = 
RecordPayloadType.valueOf(config.getString(PAYLOAD_TYPE)).getClassName();
+    } else if (config.contains("hoodie.datasource.write.payload.class")) {
+      payloadClassName = 
config.getString("hoodie.datasource.write.payload.class");
+    } else {
+      payloadClassName = 
RecordPayloadType.valueOf(PAYLOAD_TYPE.defaultValue()).getClassName();
+    }
+    // There could be tables written with payload class from com.uber.hoodie.
+    // Need to transparently change to org.apache.hudi.
+    return payloadClassName.replace("com.uber.hoodie", "org.apache.hudi");
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index cae35b18da3..2e1ad0b3bb6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -19,17 +19,18 @@
 package org.apache.hudi.common.table;
 
 import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
-import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.OrderedProperties;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.BootstrapIndexType;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.RecordPayloadType;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -40,6 +41,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.metadata.MetadataPartitionType;
 
 import org.apache.avro.Schema;
@@ -164,9 +166,16 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = 
ConfigProperty
       .key("hoodie.compaction.payload.class")
       .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+      .deprecatedAfter("1.0.0")
       .withDocumentation("Payload class to use for performing compactions, i.e 
merge delta logs with current base file and then "
           + " produce a new base file.");
 
+  public static final ConfigProperty<String> PAYLOAD_TYPE = ConfigProperty
+      .key("hoodie.compaction.payload.type")
+      .defaultValue(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())
+      .sinceVersion("1.0.0")
+      .withDocumentation(RecordPayloadType.class);
+
   public static final ConfigProperty<String> RECORD_MERGER_STRATEGY = 
ConfigProperty
       .key("hoodie.compaction.record.merger.strategy")
       .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
@@ -186,8 +195,15 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final ConfigProperty<String> BOOTSTRAP_INDEX_CLASS_NAME = 
ConfigProperty
       .key("hoodie.bootstrap.index.class")
       .defaultValue(HFileBootstrapIndex.class.getName())
+      .deprecatedAfter("1.0.0")
       .withDocumentation("Implementation to use, for mapping base files to 
bootstrap base file, that contain actual data.");
 
+  public static final ConfigProperty<String> BOOTSTRAP_INDEX_TYPE = 
ConfigProperty
+      .key("hoodie.bootstrap.index.type")
+      .defaultValue(BootstrapIndexType.HFILE.name())
+      .sinceVersion("1.0.0")
+      .withDocumentation("Bootstrap index type determines which implementation 
to use, for mapping base files to bootstrap base file, that contain actual 
data.");
+
   public static final ConfigProperty<String> BOOTSTRAP_BASE_PATH = 
ConfigProperty
       .key("hoodie.bootstrap.base.path")
       .noDefaultValue()
@@ -202,8 +218,15 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final ConfigProperty<String> KEY_GENERATOR_CLASS_NAME = 
ConfigProperty
       .key("hoodie.table.keygenerator.class")
       .noDefaultValue()
+      .deprecatedAfter("1.0.0")
       .withDocumentation("Key Generator class property for the hoodie table");
 
+  public static final ConfigProperty<String> KEY_GENERATOR_TYPE = 
ConfigProperty
+      .key("hoodie.table.keygenerator.type")
+      .noDefaultValue()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Key Generator type to determine key generator 
class");
+
   public static final ConfigProperty<HoodieTimelineTimeZone> TIMELINE_TIMEZONE 
= ConfigProperty
       .key("hoodie.table.timeline.timezone")
       .defaultValue(HoodieTimelineTimeZone.LOCAL)
@@ -235,8 +258,6 @@ public class HoodieTableConfig extends HoodieConfig {
       DATE_TIME_PARSER
   );
 
-  public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = 
NoOpBootstrapIndex.class.getName();
-
   public static final ConfigProperty<String> TABLE_CHECKSUM = ConfigProperty
       .key("hoodie.table.checksum")
       .noDefaultValue()
@@ -282,6 +303,11 @@ public class HoodieTableConfig extends HoodieConfig {
         setValue(PAYLOAD_CLASS_NAME, payloadClassName);
         needStore = true;
       }
+      if (contains(PAYLOAD_TYPE) && payloadClassName != null
+          && 
!payloadClassName.equals(RecordPayloadType.valueOf(getString(PAYLOAD_TYPE)).getClassName()))
 {
+        setValue(PAYLOAD_TYPE, 
RecordPayloadType.fromClassName(payloadClassName).name());
+        needStore = true;
+      }
       if (contains(RECORD_MERGER_STRATEGY) && recordMergerStrategyId != null
           && 
!getString(RECORD_MERGER_STRATEGY).equals(recordMergerStrategyId)) {
         setValue(RECORD_MERGER_STRATEGY, recordMergerStrategyId);
@@ -476,7 +502,7 @@ public class HoodieTableConfig extends HoodieConfig {
       }
       hoodieConfig.setDefaultValue(TYPE);
       if 
(hoodieConfig.getString(TYPE).equals(HoodieTableType.MERGE_ON_READ.name())) {
-        hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME);
+        hoodieConfig.setDefaultValue(PAYLOAD_TYPE);
         hoodieConfig.setDefaultValue(RECORD_MERGER_STRATEGY);
       }
       hoodieConfig.setDefaultValue(ARCHIVELOG_FOLDER);
@@ -486,7 +512,7 @@ public class HoodieTableConfig extends HoodieConfig {
       }
       if (hoodieConfig.contains(BOOTSTRAP_BASE_PATH)) {
         // Use the default bootstrap index class.
-        hoodieConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS_NAME, 
getDefaultBootstrapIndexClass(properties));
+        hoodieConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS_NAME, 
BootstrapIndexType.getDefaultBootstrapIndexClassName(hoodieConfig));
       }
       if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
         
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
@@ -540,10 +566,7 @@ public class HoodieTableConfig extends HoodieConfig {
    * Read the payload class for HoodieRecords from the table properties.
    */
   public String getPayloadClass() {
-    // There could be tables written with payload class from com.uber.hoodie. 
Need to transparently
-    // change to org.apache.hudi
-    return getStringOrDefault(PAYLOAD_CLASS_NAME).replace("com.uber.hoodie",
-        "org.apache.hudi");
+    return RecordPayloadType.getPayloadClassName(this);
   }
 
   /**
@@ -602,18 +625,26 @@ public class HoodieTableConfig extends HoodieConfig {
    * Read the payload class for HoodieRecords from the table properties.
    */
   public String getBootstrapIndexClass() {
-    // There could be tables written with payload class from com.uber.hoodie. 
Need to transparently
-    // change to org.apache.hudi
-    return getStringOrDefault(BOOTSTRAP_INDEX_CLASS_NAME, 
getDefaultBootstrapIndexClass(props));
+    if (!props.getBoolean(BOOTSTRAP_INDEX_ENABLE.key(), 
BOOTSTRAP_INDEX_ENABLE.defaultValue())) {
+      return BootstrapIndexType.NO_OP.getClassName();
+    }
+    String bootstrapIndexClassName;
+    if (contains(BOOTSTRAP_INDEX_TYPE)) {
+      bootstrapIndexClassName = 
BootstrapIndexType.valueOf(getString(BOOTSTRAP_INDEX_TYPE)).getClassName();
+    } else if (contains(BOOTSTRAP_INDEX_CLASS_NAME)) {
+      bootstrapIndexClassName = getString(BOOTSTRAP_INDEX_CLASS_NAME);
+    } else {
+      bootstrapIndexClassName = 
BootstrapIndexType.valueOf(BOOTSTRAP_INDEX_TYPE.defaultValue()).getClassName();
+    }
+    return bootstrapIndexClassName;
   }
 
   public static String getDefaultBootstrapIndexClass(Properties props) {
     HoodieConfig hoodieConfig = new HoodieConfig(props);
-    String defaultClass = BOOTSTRAP_INDEX_CLASS_NAME.defaultValue();
     if (!hoodieConfig.getBooleanOrDefault(BOOTSTRAP_INDEX_ENABLE)) {
-      defaultClass = NO_OP_BOOTSTRAP_INDEX_CLASS;
+      return BootstrapIndexType.NO_OP.getClassName();
     }
-    return defaultClass;
+    return 
BootstrapIndexType.valueOf(BOOTSTRAP_INDEX_TYPE.defaultValue()).getClassName();
   }
 
   public Option<String> getBootstrapBasePath() {
@@ -697,7 +728,7 @@ public class HoodieTableConfig extends HoodieConfig {
   }
 
   public String getKeyGeneratorClassName() {
-    return getString(KEY_GENERATOR_CLASS_NAME);
+    return KeyGeneratorType.getKeyGeneratorClassName(this);
   }
 
   public HoodieTimelineTimeZone getTimelineTimezone() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 1b29ba8f46f..22e29035657 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -29,9 +29,11 @@ import org.apache.hudi.common.fs.FileSystemRetryConfig;
 import org.apache.hudi.common.fs.HoodieRetryWrapperFileSystem;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.fs.NoOpConsistencyGuard;
+import org.apache.hudi.common.model.BootstrapIndexType;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
+import org.apache.hudi.common.model.RecordPayloadType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -47,6 +49,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.hadoop.SerializablePath;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -447,7 +450,10 @@ public class HoodieTableMetaClient implements Serializable 
{
 
     // Meta fields can be disabled only when either {@code 
SimpleKeyGenerator}, {@code ComplexKeyGenerator}, {@code 
NonpartitionedKeyGenerator} is used
     if (!getTableConfig().populateMetaFields()) {
-      String keyGenClass = 
properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), 
"org.apache.hudi.keygen.SimpleKeyGenerator");
+      String keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new 
HoodieConfig(properties));
+      if (StringUtils.isNullOrEmpty(keyGenClass)) {
+        keyGenClass = "org.apache.hudi.keygen.SimpleKeyGenerator";
+      }
       if (!keyGenClass.equals("org.apache.hudi.keygen.SimpleKeyGenerator")
           && 
!keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")
           && 
!keyGenClass.equals("org.apache.hudi.keygen.ComplexKeyGenerator")) {
@@ -793,6 +799,7 @@ public class HoodieTableMetaClient implements Serializable {
     private String recordKeyFields;
     private String archiveLogFolder;
     private String payloadClassName;
+    private String payloadType;
     private String recordMergerStrategy;
     private Integer timelineLayoutVersion;
     private String baseFileFormat;
@@ -805,6 +812,7 @@ public class HoodieTableMetaClient implements Serializable {
     private Boolean bootstrapIndexEnable;
     private Boolean populateMetaFields;
     private String keyGeneratorClassProp;
+    private String keyGeneratorType;
     private Boolean hiveStylePartitioningEnable;
     private Boolean urlEncodePartitioning;
     private HoodieTimelineTimeZone commitTimeZone;
@@ -863,6 +871,11 @@ public class HoodieTableMetaClient implements Serializable 
{
       return this;
     }
 
+    public PropertyBuilder setPayloadType(String payloadType) {
+      this.payloadType = payloadType;
+      return this;
+    }
+
     public PropertyBuilder setRecordMergerStrategy(String 
recordMergerStrategy) {
       this.recordMergerStrategy = recordMergerStrategy;
       return this;
@@ -927,6 +940,11 @@ public class HoodieTableMetaClient implements Serializable 
{
       return this;
     }
 
+    public PropertyBuilder setKeyGeneratorType(String keyGeneratorType) {
+      this.keyGeneratorType = keyGeneratorType;
+      return this;
+    }
+
     public PropertyBuilder setHiveStylePartitioningEnable(Boolean 
hiveStylePartitioningEnable) {
       this.hiveStylePartitioningEnable = hiveStylePartitioningEnable;
       return this;
@@ -1013,8 +1031,9 @@ public class HoodieTableMetaClient implements 
Serializable {
             hoodieConfig.getString(HoodieTableConfig.ARCHIVELOG_FOLDER));
       }
       if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME)) {
-        setPayloadClassName(
-            hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+        
setPayloadClassName(hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+      } else if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_TYPE)) {
+        
setPayloadClassName(RecordPayloadType.valueOf(hoodieConfig.getString(HoodieTableConfig.PAYLOAD_TYPE)).getClassName());
       }
       if (hoodieConfig.contains(HoodieTableConfig.RECORD_MERGER_STRATEGY)) {
         setRecordMergerStrategy(
@@ -1031,6 +1050,9 @@ public class HoodieTableMetaClient implements 
Serializable {
         setBootstrapIndexClass(
             
hoodieConfig.getString(HoodieTableConfig.BOOTSTRAP_INDEX_CLASS_NAME));
       }
+      if (hoodieConfig.contains(HoodieTableConfig.BOOTSTRAP_INDEX_TYPE)) {
+        
setPayloadClassName(BootstrapIndexType.valueOf(hoodieConfig.getString(HoodieTableConfig.BOOTSTRAP_INDEX_TYPE)).getClassName());
+      }
       if (hoodieConfig.contains(HoodieTableConfig.BOOTSTRAP_BASE_PATH)) {
         
setBootstrapBasePath(hoodieConfig.getString(HoodieTableConfig.BOOTSTRAP_BASE_PATH));
       }
@@ -1063,6 +1085,8 @@ public class HoodieTableMetaClient implements 
Serializable {
       }
       if (hoodieConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) {
         
setKeyGeneratorClassProp(hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME));
+      } else if (hoodieConfig.contains(HoodieTableConfig.KEY_GENERATOR_TYPE)) {
+        
setKeyGeneratorClassProp(KeyGeneratorType.valueOf(hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_TYPE)).getClassName());
       }
       if 
(hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) {
         
setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
@@ -1101,13 +1125,17 @@ public class HoodieTableMetaClient implements 
Serializable {
       }
       tableConfig.setValue(HoodieTableConfig.NAME, tableName);
       tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name());
-      tableConfig.setValue(HoodieTableConfig.VERSION,
-          String.valueOf(HoodieTableVersion.current().versionCode()));
-      if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != 
null) {
-        tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, 
payloadClassName);
-      }
-      if (tableType == HoodieTableType.MERGE_ON_READ && recordMergerStrategy 
!= null) {
-        tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, 
recordMergerStrategy);
+      tableConfig.setValue(HoodieTableConfig.VERSION, 
String.valueOf(HoodieTableVersion.current().versionCode()));
+
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        if (null != payloadClassName) {
+          tableConfig.setValue(HoodieTableConfig.PAYLOAD_TYPE, 
RecordPayloadType.fromClassName(payloadClassName).name());
+        } else if (null != payloadType) {
+          tableConfig.setValue(HoodieTableConfig.PAYLOAD_TYPE, payloadType);
+        }
+        if (recordMergerStrategy != null) {
+          tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, 
recordMergerStrategy);
+        }
       }
 
       if (null != tableCreateSchema) {
@@ -1130,7 +1158,7 @@ public class HoodieTableMetaClient implements 
Serializable {
       }
 
       if (null != bootstrapIndexClass) {
-        tableConfig.setValue(HoodieTableConfig.BOOTSTRAP_INDEX_CLASS_NAME, 
bootstrapIndexClass);
+        tableConfig.setValue(HoodieTableConfig.BOOTSTRAP_INDEX_TYPE, 
BootstrapIndexType.fromClassName(bootstrapIndexClass).name());
       }
 
       if (null != bootstrapIndexEnable) {
@@ -1161,7 +1189,9 @@ public class HoodieTableMetaClient implements 
Serializable {
         tableConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS, 
Boolean.toString(populateMetaFields));
       }
       if (null != keyGeneratorClassProp) {
-        tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, 
keyGeneratorClassProp);
+        tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, 
KeyGeneratorType.fromClassName(keyGeneratorClassProp).name());
+      } else if (null != keyGeneratorType) {
+        tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, 
keyGeneratorType);
       }
       if (null != hiveStylePartitioningEnable) {
         tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, 
Boolean.toString(hiveStylePartitioningEnable));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 2dad6f97946..0b95bbabcb3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -19,8 +19,10 @@
 package org.apache.hudi.common.util;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodiePayloadProps;
+import org.apache.hudi.common.model.RecordPayloadType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
@@ -77,13 +79,7 @@ public class ConfigUtils {
    * Get payload class.
    */
   public static String getPayloadClass(Properties properties) {
-    String payloadClass = null;
-    if (properties.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key())) {
-      payloadClass = 
properties.getProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key());
-    } else if 
(properties.containsKey("hoodie.datasource.write.payload.class")) {
-      payloadClass = 
properties.getProperty("hoodie.datasource.write.payload.class");
-    }
-    return payloadClass;
+    return RecordPayloadType.getPayloadClassName(new HoodieConfig(properties));
   }
 
   public static List<String> split2List(String param) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
 
b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
index 5434b4901c0..8d79acd7db1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
@@ -20,11 +20,17 @@ package org.apache.hudi.keygen.constant;
 
 import org.apache.hudi.common.config.EnumDescription;
 import org.apache.hudi.common.config.EnumFieldDescription;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static 
org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_CLASS_NAME;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_TYPE;
+
 /**
  * Types of {@link org.apache.hudi.keygen.KeyGenerator}.
  */
@@ -33,26 +39,81 @@ import java.util.List;
 public enum KeyGeneratorType {
 
   @EnumFieldDescription("Simple key generator, which takes names of fields to 
be used for recordKey and partitionPath as configs.")
-  SIMPLE,
+  SIMPLE("org.apache.hudi.keygen.SimpleKeyGenerator"),
+  @EnumFieldDescription("Simple key generator, which takes names of fields to 
be used for recordKey and partitionPath as configs.")
+  SIMPLE_AVRO("org.apache.hudi.keygen.SimpleAvroKeyGenerator"),
 
   @EnumFieldDescription("Complex key generator, which takes names of fields to 
be used for recordKey and partitionPath as configs.")
-  COMPLEX,
+  COMPLEX("org.apache.hudi.keygen.ComplexKeyGenerator"),
+  @EnumFieldDescription("Complex key generator, which takes names of fields to 
be used for recordKey and partitionPath as configs.")
+  COMPLEX_AVRO("org.apache.hudi.keygen.ComplexAvroKeyGenerator"),
 
   @EnumFieldDescription("Timestamp-based key generator, that relies on 
timestamps for partitioning field. Still picks record key by name.")
-  TIMESTAMP,
+  TIMESTAMP("org.apache.hudi.keygen.TimestampBasedKeyGenerator"),
+  @EnumFieldDescription("Timestamp-based key generator, that relies on 
timestamps for partitioning field. Still picks record key by name.")
+  TIMESTAMP_AVRO("org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator"),
 
   @EnumFieldDescription("This is a generic implementation type of KeyGenerator 
where users can configure record key as a single field or "
       + " a combination of fields. Similarly partition path can be configured 
to have multiple fields or only one field. "
       + " This KeyGenerator expects value for prop 
\"hoodie.datasource.write.partitionpath.field\" in a specific format. "
       + " For example: "
       + " properties.put(\"hoodie.datasource.write.partitionpath.field\", 
\"field1:PartitionKeyType1,field2:PartitionKeyType2\").")
-  CUSTOM,
+  CUSTOM("org.apache.hudi.keygen.CustomKeyGenerator"),
+  @EnumFieldDescription("This is a generic implementation type of KeyGenerator 
where users can configure record key as a single field or "
+      + " a combination of fields. Similarly partition path can be configured 
to have multiple fields or only one field. "
+      + " This KeyGenerator expects value for prop 
\"hoodie.datasource.write.partitionpath.field\" in a specific format. "
+      + " For example: "
+      + " properties.put(\"hoodie.datasource.write.partitionpath.field\", 
\"field1:PartitionKeyType1,field2:PartitionKeyType2\").")
+  CUSTOM_AVRO("org.apache.hudi.keygen.CustomAvroKeyGenerator"),
 
   @EnumFieldDescription("Simple Key generator for non-partitioned tables.")
-  NON_PARTITION,
+  NON_PARTITION("org.apache.hudi.keygen.NonpartitionedKeyGenerator"),
+  @EnumFieldDescription("Simple Key generator for non-partitioned tables.")
+  NON_PARTITION_AVRO("org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator"),
 
   @EnumFieldDescription("Key generator for deletes using global indices.")
-  GLOBAL_DELETE;
+  GLOBAL_DELETE("org.apache.hudi.keygen.GlobalDeleteKeyGenerator"),
+  @EnumFieldDescription("Key generator for deletes using global indices.")
+  GLOBAL_DELETE_AVRO("org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator"),
+
+  @EnumFieldDescription("Automatic record key generation.")
+  AUTO_RECORD("org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator"),
+  @EnumFieldDescription("Automatic record key generation.")
+  
AUTO_RECORD_AVRO("org.apache.hudi.keygen.AutoRecordGenWrapperAvroKeyGenerator"),
+
+  @EnumFieldDescription("Custom key generator for the Hudi table metadata.")
+  
HOODIE_TABLE_METADATA("org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator"),
+
+  @EnumFieldDescription("Custom spark-sql specific KeyGenerator overriding 
behavior handling TimestampType partition values.")
+  SPARK_SQL("org.apache.spark.sql.hudi.command.SqlKeyGenerator"),
+
+  @EnumFieldDescription("A KeyGenerator which use the uuid as the record key.")
+  SPARK_SQL_UUID("org.apache.spark.sql.hudi.command.UuidKeyGenerator"),
+
+  @EnumFieldDescription("Meant to be used internally for the spark sql MERGE 
INTO command.")
+  
SPARK_SQL_MERGE_INTO("org.apache.spark.sql.hudi.command.MergeIntoKeyGenerator"),
+
+  @EnumFieldDescription("A test KeyGenerator for deltastreamer tests.")
+  
STREAMER_TEST("org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator");
+
+  private final String className;
+
+  KeyGeneratorType(String className) {
+    this.className = className;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public static KeyGeneratorType fromClassName(String className) {
+    for (KeyGeneratorType type : KeyGeneratorType.values()) {
+      if (type.getClassName().equals(className)) {
+        return type;
+      }
+    }
+    throw new IllegalArgumentException("No KeyGeneratorType found for class 
name: " + className);
+  }
 
   public static List<String> getNames() {
     List<String> names = new ArrayList<>(KeyGeneratorType.values().length);
@@ -60,4 +121,14 @@ public enum KeyGeneratorType {
         .forEach(x -> names.add(x.name()));
     return names;
   }
+
+  @Nullable
+  public static String getKeyGeneratorClassName(HoodieConfig config) {
+    if (config.contains(KEY_GENERATOR_CLASS_NAME)) {
+      return config.getString(KEY_GENERATOR_CLASS_NAME);
+    } else if (config.contains(KEY_GENERATOR_TYPE)) {
+      return 
KeyGeneratorType.valueOf(config.getString(KEY_GENERATOR_TYPE)).getClassName();
+    }
+    return null;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index d528c325b29..83816044acb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -134,6 +134,10 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
               && !conf.contains(FlinkOptions.PAYLOAD_CLASS_NAME)) {
             conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
           }
+          if (tableConfig.contains(HoodieTableConfig.PAYLOAD_TYPE)
+              && !conf.contains(FlinkOptions.PAYLOAD_CLASS_NAME)) {
+            conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
tableConfig.getPayloadClass());
+          }
         });
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ddc9d55e50c..ff7f33d8beb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -47,8 +47,8 @@ import scala.language.implicitConversions
  */
 
 /**
-  * Options supported for reading hoodie tables.
-  */
+ * Options supported for reading hoodie tables.
+ */
 object DataSourceReadOptions {
 
   val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
@@ -272,8 +272,8 @@ object DataSourceReadOptions {
 }
 
 /**
-  * Options supported for writing hoodie tables.
-  */
+ * Options supported for writing hoodie tables.
+ */
 object DataSourceWriteOptions {
 
   val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
@@ -325,11 +325,11 @@ object DataSourceWriteOptions {
   val SPARK_SQL_WRITES_PREPPED_KEY = "_hoodie.spark.sql.writes.prepped";
 
   /**
-    * May be derive partition path from incoming df if not explicitly set.
-    *
-    * @param optParams Parameters to be translated
-    * @return Parameters after translation
-    */
+   * May be derive partition path from incoming df if not explicitly set.
+   *
+   * @param optParams Parameters to be translated
+   * @return Parameters after translation
+   */
   def mayBeDerivePartitionPath(optParams: Map[String, String]): Map[String, 
String] = {
     var translatedOptParams = optParams
     // translate the api partitionBy of spark DataFrameWriter to 
PARTITIONPATH_FIELD
@@ -384,6 +384,8 @@ object DataSourceWriteOptions {
    */
   val PAYLOAD_CLASS_NAME = HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME
 
+  val PAYLOAD_TYPE = HoodieWriteConfig.WRITE_PAYLOAD_TYPE
+
   /**
    * HoodieMerger will replace the payload to process the merge of data
    * and provide the same capabilities as the payload
@@ -475,7 +477,7 @@ object DataSourceWriteOptions {
     .defaultValue("false")
     .markAdvanced()
     .withDocumentation("If set to true, records from the incoming dataframe 
will not overwrite existing records with the same key during the write 
operation. " +
-    "This config is deprecated as of 0.14.0. Please use 
hoodie.datasource.insert.dup.policy instead.");
+      "This config is deprecated as of 0.14.0. Please use 
hoodie.datasource.insert.dup.policy instead.");
 
   val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.partitions.to.delete")
@@ -557,7 +559,7 @@ object DataSourceWriteOptions {
     .withValidValues(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY, 
FAIL_INSERT_DUP_POLICY)
     .withDocumentation("When operation type is set to \"insert\", users can 
optionally enforce a dedup policy. This policy will be employed "
       + " when records being ingested already exists in storage. Default 
policy is none and no action will be taken. Another option is to choose " +
-    " \"drop\", on which matching records from incoming will be dropped and 
the rest will be ingested. Third option is \"fail\" which will " +
+      " \"drop\", on which matching records from incoming will be dropped and 
the rest will be ingested. Third option is \"fail\" which will " +
       "fail the write operation when same records are re-ingested. In other 
words, a given record as deduced by the key generation policy " +
       "can be ingested only once to the target table of interest.")
 
@@ -904,7 +906,7 @@ object DataSourceOptionsHelper {
   private val log = LoggerFactory.getLogger(DataSourceOptionsHelper.getClass)
 
   // put all the configs with alternatives here
-  val allConfigsWithAlternatives = List(
+  private val allConfigsWithAlternatives = List(
     DataSourceReadOptions.QUERY_TYPE,
     DataSourceWriteOptions.TABLE_TYPE,
     HoodieTableConfig.BASE_FILE_FORMAT,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7828cc7ee5a..9231472a799 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -54,6 +54,7 @@ import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
 import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, 
SerDeHelper}
+import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
 import org.apache.hudi.keygen.{BaseKeyGenerator, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
@@ -226,13 +227,21 @@ object HoodieSparkSqlWriter {
         val archiveLogFolder = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
         val populateMetaFields = 
hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
         val useBaseFormatMetaFile = 
hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);
+        val payloadClass =
+          if 
(StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)))
+            hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)
+          else RecordPayloadType.getPayloadClassName(hoodieConfig)
+        val keyGenProp =
+          if 
(StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)))
+            
hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
+          else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
         HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(tableType)
           .setDatabaseName(databaseName)
           .setTableName(tblName)
           .setBaseFileFormat(baseFileFormat)
           .setArchiveLogFolder(archiveLogFolder)
-          .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+          .setPayloadClassName(payloadClass)
           // we can't fetch preCombine field from hoodieConfig object, since 
it falls back to "ts" as default value,
           // but we are interested in what user has set, hence fetching from 
optParams.
           .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), 
null))
@@ -241,7 +250,7 @@ object HoodieSparkSqlWriter {
           .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
           
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
           
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
-          
.setKeyGeneratorClassProp(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key))
+          .setKeyGeneratorClassProp(keyGenProp)
           .set(timestampKeyGeneratorConfigs)
           
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
           
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
@@ -806,10 +815,14 @@ object HoodieSparkSqlWriter {
         val archiveLogFolder = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
         val partitionColumns = 
HoodieWriterUtils.getPartitionColumns(parameters)
         val recordKeyFields = 
hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
+        val payloadClass =
+          if 
(StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)))
+            hoodieConfig.getString(DataSourceWriteOptions.PAYLOAD_CLASS_NAME)
+          else RecordPayloadType.getPayloadClassName(hoodieConfig)
         val keyGenProp =
           if 
(StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)))
             
hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
-          else 
hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+          else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
         val timestampKeyGeneratorConfigs = 
extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenProp, parameters)
         val populateMetaFields = 
java.lang.Boolean.parseBoolean(parameters.getOrElse(
           HoodieTableConfig.POPULATE_META_FIELDS.key(),
@@ -826,7 +839,7 @@ object HoodieSparkSqlWriter {
           .setTableName(tableName)
           .setRecordKeyFields(recordKeyFields)
           .setArchiveLogFolder(archiveLogFolder)
-          
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
+          .setPayloadClassName(payloadClass)
           
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
           .setBootstrapIndexClass(bootstrapIndexClass)
           .setBaseFileFormat(baseFileFormat)
@@ -1162,9 +1175,8 @@ object HoodieSparkSqlWriter {
         }
     }
     val mergedParams = mutable.Map.empty ++ 
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptsWithMappedTableConfig.toMap)
-    if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
-      && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
-      mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = 
mergedParams(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key)
+    if (mergedParams.contains(KEYGENERATOR_CLASS_NAME.key) && 
!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_TYPE.key)) {
+      mergedParams(HoodieTableConfig.KEY_GENERATOR_TYPE.key) = 
KeyGeneratorType.fromClassName(mergedParams(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key)).name
     }
     // use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY
     if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 5230c34984f..b2c44cc3330 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableConfig
 import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
+import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.SparkKeyGenUtils
@@ -192,7 +193,7 @@ object HoodieWriterUtils {
         }
 
         val datasourceKeyGen = getOriginKeyGenerator(params)
-        val tableConfigKeyGen = 
tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+        val tableConfigKeyGen = 
KeyGeneratorType.getKeyGeneratorClassName(tableConfig)
         if (null != datasourceKeyGen && null != tableConfigKeyGen
           && datasourceKeyGen != tableConfigKeyGen) {
           
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
@@ -228,7 +229,7 @@ object HoodieWriterUtils {
     val diffConfigs = StringBuilder.newBuilder
 
     if (null != tableConfig) {
-      val tableConfigKeyGen = 
tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+      val tableConfigKeyGen = 
KeyGeneratorType.getKeyGeneratorClassName(tableConfig)
       if (null != tableConfigKeyGen && null != datasourceKeyGen) {
         val nonPartitionedTableConfig = 
tableConfigKeyGen.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
         val simpleKeyDataSourceConfig = 
datasourceKeyGen.equals(classOf[SimpleKeyGenerator].getCanonicalName)
@@ -256,13 +257,14 @@ object HoodieWriterUtils {
     }
   }
 
-  val sparkDatasourceConfigsToTableConfigsMap = Map(
+  private val sparkDatasourceConfigsToTableConfigsMap = Map(
     TABLE_NAME -> HoodieTableConfig.NAME,
     TABLE_TYPE -> HoodieTableConfig.TYPE,
     PRECOMBINE_FIELD -> HoodieTableConfig.PRECOMBINE_FIELD,
     PARTITIONPATH_FIELD -> HoodieTableConfig.PARTITION_FIELDS,
     RECORDKEY_FIELD -> HoodieTableConfig.RECORDKEY_FIELDS,
     PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME,
+    PAYLOAD_TYPE -> HoodieTableConfig.PAYLOAD_TYPE,
     RECORD_MERGER_STRATEGY -> HoodieTableConfig.RECORD_MERGER_STRATEGY
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index a77a5dcbe2f..ee041e94b87 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -26,7 +26,7 @@ import 
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
 import org.apache.spark.internal.Logging
@@ -304,6 +304,10 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
       extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
         HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
           originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
+    } else if 
(originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_TYPE.key)) {
+      extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
+        HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
+          
KeyGeneratorType.valueOf(originTableConfig(HoodieTableConfig.KEY_GENERATOR_TYPE.key)).getClassName)
     } else {
       val primaryKeys = 
table.properties.getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName, 
table.storage.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)).toString
       val partitions = table.partitionColumnNames.mkString(",")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index abe98bb46cf..66c81ae331e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -69,6 +69,13 @@ object HoodieOptionConfig {
     .defaultValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue())
     .build()
 
+  val SQL_PAYLOAD_TYPE: HoodieSQLOption[String] = buildConf()
+    .withSqlKey("payloadType")
+    .withHoodieKey(DataSourceWriteOptions.PAYLOAD_TYPE.key)
+    .withTableConfigKey(HoodieTableConfig.PAYLOAD_TYPE.key)
+    .defaultValue(DataSourceWriteOptions.PAYLOAD_TYPE.defaultValue())
+    .build()
+
   val SQL_RECORD_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf()
     .withSqlKey("recordMergerStrategy")
     .withHoodieKey(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key)
@@ -193,7 +200,7 @@ object HoodieOptionConfig {
   // extract primaryKey, preCombineField, type options
   def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
     val sqlOptions = mapTableConfigsToSqlOptions(options)
-    val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- 
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_RECORD_MERGER_STRATEGY.sqlKeyName)
+    val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- 
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_RECORD_MERGER_STRATEGY.sqlKeyName) 
-- Set(SQL_PAYLOAD_TYPE.sqlKeyName)
     sqlOptions.filterKeys(targetOptions.contains)
   }
 
@@ -233,7 +240,7 @@ object HoodieOptionConfig {
   def makeOptionsCaseInsensitive(sqlOptions: Map[String, String]): Map[String, 
String] = {
     // Make Keys Case Insensitive
     val standardOptions = Seq(SQL_KEY_TABLE_PRIMARY_KEY, 
SQL_KEY_PRECOMBINE_FIELD,
-    SQL_KEY_TABLE_TYPE, SQL_PAYLOAD_CLASS, SQL_RECORD_MERGER_STRATEGY).map(key 
=> key.sqlKeyName)
+    SQL_KEY_TABLE_TYPE, SQL_PAYLOAD_CLASS, SQL_RECORD_MERGER_STRATEGY, 
SQL_PAYLOAD_TYPE).map(key => key.sqlKeyName)
 
     sqlOptions.map(option => {
       standardOptions.find(x => 
x.toLowerCase().contains(option._1.toLowerCase())) match {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index 90ab2f9cbab..49059d20389 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.cli;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -41,6 +42,7 @@ import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
 import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.util.SparkKeyGenUtils;
 
@@ -251,8 +253,8 @@ public class BootstrapExecutorUtils implements Serializable 
{
             URL_ENCODE_PARTITIONING.key(),
             Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
         .setCommitTimezone(HoodieTimelineTimeZone.valueOf(props.getString(
-                TIMELINE_TIMEZONE.key(),
-                String.valueOf(TIMELINE_TIMEZONE.defaultValue()))))
+            TIMELINE_TIMEZONE.key(),
+            String.valueOf(TIMELINE_TIMEZONE.defaultValue()))))
         .setPartitionMetafileUseBaseFormat(props.getBoolean(
             PARTITION_METAFILE_USE_BASE_FORMAT.key(),
             PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
@@ -270,7 +272,7 @@ public class BootstrapExecutorUtils implements Serializable 
{
     } else if 
(StringUtils.nonEmpty(props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(),
 null))) {
       keyGenClass = 
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props);
     } else {
-      keyGenClass = 
props.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), 
SimpleKeyGenerator.class.getName());
+      keyGenClass = KeyGeneratorType.getKeyGeneratorClassName(new 
HoodieConfig(props));
     }
     props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), keyGenClass);
     String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index bc3540ebf50..642592a6c9f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient}
 import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
-import org.apache.hudi.keygen.SimpleKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, 
HoodieCatalogTable}
@@ -143,7 +143,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     assertResult("dt")(tableConfig(HoodieTableConfig.PARTITION_FIELDS.key))
     assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key))
     assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key))
-    
assertResult(classOf[SimpleKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
+    
assertResult(KeyGeneratorType.SIMPLE.name())(tableConfig(HoodieTableConfig.KEY_GENERATOR_TYPE.key))
     assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
     assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
     assertFalse(tableConfig.contains(OPERATION.key()))
@@ -1204,8 +1204,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
         .setBasePath(tablePath)
         .setConf(spark.sessionState.newHadoopConf())
         .build()
-      val realKeyGenerator =
-        
metaClient.getTableConfig.getProps.asScala.toMap.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key).get
+      val realKeyGenerator = metaClient.getTableConfig.getKeyGeneratorClassName
       assertResult(targetGenerator)(realKeyGenerator)
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 43fcb79ecf9..985300c44c2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieRecordMerger, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
@@ -31,17 +32,19 @@ class TestHoodieOptionConfig extends 
SparkClientFunctionalTestHarness {
   def testWithDefaultSqlOptions(): Unit = {
     val ops1 = Map("primaryKey" -> "id")
     val with1 = HoodieOptionConfig.withDefaultSqlOptions(ops1)
-    assertTrue(with1.size == 4)
+    assertTrue(with1.size == 5)
     assertTrue(with1("primaryKey") == "id")
     assertTrue(with1("type") == "cow")
     assertTrue(with1("payloadClass") == 
classOf[OverwriteWithLatestAvroPayload].getName)
     assertTrue(with1("recordMergerStrategy") == 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+    assertTrue(with1("payloadType") == 
DataSourceWriteOptions.PAYLOAD_TYPE.defaultValue)
 
     val ops2 = Map("primaryKey" -> "id",
       "preCombineField" -> "timestamp",
       "type" -> "mor",
       "payloadClass" -> classOf[DefaultHoodieRecordPayload].getName,
-      "recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
+      "recordMergerStrategy" -> 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID,
+      "payloadType" -> DataSourceWriteOptions.PAYLOAD_TYPE.defaultValue
     )
     val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
     assertTrue(ops2 == with2)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index eaf977e82d1..15a9ed675c3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -118,7 +118,7 @@ class TestRepairsProcedure extends 
HoodieSparkProcedureTestBase {
           |[hoodie.datasource.write.partitionpath.urlencode,false,null]
           |[hoodie.table.checksum,,]
           |[hoodie.table.create.schema,,]
-          
|[hoodie.table.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator,null]
+          |[hoodie.table.keygenerator.type,NON_PARTITION,null]
           |[hoodie.table.name,,]
           |[hoodie.table.precombine.field,ts,null]
           |[hoodie.table.recordkey.fields,id,null]
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 9c708144931..96d888f5f07 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -26,7 +26,6 @@ import org.apache.hudi.HoodieSparkUtils$;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
-import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.LockConfiguration;
@@ -104,7 +103,6 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.AnalysisException;
@@ -160,6 +158,7 @@ import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
 import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.createMetaClient;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommitsAfterRollback;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
@@ -378,8 +377,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @Test
-  public void testPropsWithInvalidKeyGenerator() throws Exception {
-    Exception e = assertThrows(SparkException.class, () -> {
+  public void testPropsWithInvalidKeyGenerator() {
+    Exception e = assertThrows(IllegalArgumentException.class, () -> {
       String tableBasePath = basePath + "/test_table_invalid_key_gen";
       HoodieDeltaStreamer deltaStreamer =
           new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
@@ -388,7 +387,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }, "Should error out when setting the key generator class property to an 
invalid value");
     // expected
     LOG.debug("Expected error during getting the key generator", e);
-    assertTrue(e.getMessage().contains("Could not load key generator class 
invalid"));
+    assertTrue(e.getMessage().contains("No KeyGeneratorType found for class 
name"));
   }
 
   private static Stream<Arguments> provideInferKeyGenArgs() {
@@ -1395,7 +1394,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @Test
-  public void testNullSchemaProvider() throws Exception {
+  public void testNullSchemaProvider() {
     String tableBasePath = basePath + "/test_table";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true,
@@ -1423,14 +1422,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
 
     //now assert that hoodie.properties file now has updated payload class name
-    Properties props = new Properties();
-    String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
-    FileSystem fs = FSUtils.getFs(cfg.targetBasePath, 
jsc.hadoopConfiguration());
-    try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
-      props.load(inputStream);
-    }
-
-    assertEquals(new 
HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), 
DummyAvroPayload.class.getName());
+    HoodieTableMetaClient metaClient = createMetaClient(jsc, dataSetBasePath, 
false);
+    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DummyAvroPayload.class.getName());
   }
 
   @Test
@@ -1443,13 +1436,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now assert that hoodie.properties file now has updated payload class name
-    Properties props = new Properties();
-    String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
-    FileSystem fs = FSUtils.getFs(cfg.targetBasePath, 
jsc.hadoopConfiguration());
-    try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
-      props.load(inputStream);
-    }
-    assertEquals(new 
HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), 
PartialUpdateAvroPayload.class.getName());
+    HoodieTableMetaClient metaClient = createMetaClient(jsc, dataSetBasePath, 
false);
+    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
PartialUpdateAvroPayload.class.getName());
   }
 
   @Test
@@ -2588,13 +2576,6 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
   }
 
-  public static class TestTableLevelGenerator extends SimpleKeyGenerator {
-
-    public TestTableLevelGenerator(TypedProperties props) {
-      super(props);
-    }
-  }
-
   public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
 
     public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index a8ee0c694fd..26ea61e31fe 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -263,7 +263,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
       switch (tableExecutionContext.getTableName()) {
         case "dummy_table_short_trip":
           String tableLevelKeyGeneratorClass = 
tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key());
-          
assertEquals(TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(), 
tableLevelKeyGeneratorClass);
+          assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), 
tableLevelKeyGeneratorClass);
           List<String> transformerClass = 
tableExecutionContext.getConfig().transformerClassNames;
           assertEquals(1, transformerClass.size());
           
assertEquals("org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer",
 transformerClass.get(0));
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
 
b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
index 25b392d580a..d415e19eb20 100644
--- 
a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
+++ 
b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
@@ -22,7 +22,7 @@ hoodie.deltastreamer.source.kafka.topic=topic2
 hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
 hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
 hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
-hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestTableLevelGenerator
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator
 
hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
 hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
 
hoodie.deltastreamer.transformer.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer

Reply via email to