This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a34067826c2 [HUDI-5514][HUDI-5574][HUDI-5604][HUDI-5535] Adding auto 
generation of record keys support to Hudi/Spark (#8107)
a34067826c2 is described below

commit a34067826c2f367be942052cf86d64a1cfdd01b8
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 9 05:33:56 2023 -0700

    [HUDI-5514][HUDI-5574][HUDI-5604][HUDI-5535] Adding auto generation of 
record keys support to Hudi/Spark (#8107)
    
    * Adding auto generation of record keys support to Hudi
    * Fixing non partitioned key generator for inference
---
 .../AutoRecordGenWrapperAvroKeyGenerator.java      |  83 +++++++
 .../hudi/keygen/ComplexAvroKeyGenerator.java       |   5 +-
 .../apache/hudi/keygen/CustomAvroKeyGenerator.java |  10 +-
 .../hudi/keygen/GlobalAvroDeleteKeyGenerator.java  |   6 +-
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |  79 ++++--
 .../keygen/NonpartitionedAvroKeyGenerator.java     |   7 +-
 .../apache/hudi/keygen/SimpleAvroKeyGenerator.java |  11 +-
 .../keygen/TimestampBasedAvroKeyGenerator.java     |   2 +-
 .../factory/HoodieAvroKeyGeneratorFactory.java     |  28 ++-
 .../org/apache/hudi/keygen/TestKeyGenUtils.java    |  13 +-
 ...estCreateAvroKeyGeneratorByTypeWithFactory.java |  13 +
 .../keygen/AutoRecordGenWrapperKeyGenerator.java   | 108 +++++++++
 .../apache/hudi/keygen/ComplexKeyGenerator.java    |   5 +-
 .../org/apache/hudi/keygen/CustomKeyGenerator.java |  20 +-
 .../hudi/keygen/GlobalDeleteKeyGenerator.java      |   4 +-
 .../hudi/keygen/NonpartitionedKeyGenerator.java    |  10 +-
 .../org/apache/hudi/keygen/SimpleKeyGenerator.java |  15 +-
 .../hudi/keygen/TimestampBasedKeyGenerator.java    |   5 +-
 .../factory/HoodieSparkKeyGeneratorFactory.java    |  15 +-
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |  24 +-
 .../org/apache/hudi/util/SparkKeyGenUtils.scala    |  41 ++--
 .../hudi/common/table/HoodieTableConfig.java       |  18 +-
 .../org/apache/hudi/keygen/BaseKeyGenerator.java   |   1 +
 .../testsuite/HoodieDeltaStreamerWrapper.java      |   4 +-
 .../apache/hudi/AutoRecordKeyGenerationUtils.scala |  50 ++++
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   6 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  57 +++--
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   7 +-
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |   1 +
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |   9 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   8 +-
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   |  14 +-
 .../TestHoodieDatasetBulkInsertHelper.java         |  24 +-
 .../hudi/keygen/TestComplexKeyGenerator.java       |   6 +-
 .../apache/hudi/keygen/TestCustomKeyGenerator.java |  26 +-
 .../keygen/TestGlobalDeleteRecordGenerator.java    |   6 +-
 .../keygen/TestNonpartitionedKeyGenerator.java     |   6 +-
 .../apache/hudi/keygen/TestSimpleKeyGenerator.java |   6 +-
 .../TestCreateKeyGeneratorByTypeWithFactory.java   |  13 +
 .../org/apache/hudi/TestDataSourceDefaults.scala   |  10 +-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |   2 +-
 .../apache/hudi/functional/CommonOptionUtils.scala |  56 +++++
 .../TestAutoGenerationOfRecordKeys.scala           | 264 +++++++++++++++++++++
 .../apache/hudi/functional/TestCOWDataSource.scala |  16 +-
 .../hudi/functional/TestStreamingSource.scala      |   4 +
 .../spark/sql/hudi/TestHoodieOptionConfig.scala    |  10 -
 .../apache/spark/sql/hudi/TestInsertTable.scala    |  40 +++-
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 103 ++++----
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  40 +++-
 49 files changed, 1042 insertions(+), 269 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
new file mode 100644
index 00000000000..a8ae48e1d67
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.List;
+
+/**
+ * A wrapper key generator to intercept getRecordKey calls for auto record key 
generator.
+ * <ol>
+ *   <li>Generated keys will be unique not only w/in provided 
[[org.apache.spark.sql.DataFrame]], but
+ *   globally unique w/in the target table</li>
+ *   <li>Generated keys have minimal overhead (to compute, persist and 
read)</li>
+ * </ol>
+ *
+ * Keys adhere to the following format:
+ *
+ * [instantTime]_[PartitionId]_[RowId]
+ *
+ * where
+ * instantTime refers to the commit time of the batch being ingested.
+ * PartitionId refers to spark's partition Id.
+ * RowId refers to the row index within the spark partition.
+ */
+public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator {
+
+  private final BaseKeyGenerator keyGenerator;
+  private final int partitionId;
+  private final String instantTime;
+  private int rowId;
+
+  public AutoRecordGenWrapperAvroKeyGenerator(TypedProperties config, 
BaseKeyGenerator keyGenerator) {
+    super(config);
+    this.keyGenerator = keyGenerator;
+    this.rowId = 0;
+    this.partitionId = 
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
+    this.instantTime = 
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return keyGenerator.getPartitionPath(record);
+  }
+
+  @Override
+  public List<String> getRecordKeyFieldNames() {
+    return keyGenerator.getRecordKeyFieldNames();
+  }
+
+  public List<String> getPartitionPathFields() {
+    return keyGenerator.getPartitionPathFields();
+  }
+
+  public boolean isConsistentLogicalTimestampEnabled() {
+    return keyGenerator.isConsistentLogicalTimestampEnabled();
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
index 9ff5c522e45..1c4860779cb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -32,10 +32,7 @@ public class ComplexAvroKeyGenerator extends 
BaseKeyGenerator {
 
   public ComplexAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
-        .map(String::trim)
-        .filter(s -> !s.isEmpty())
-        .collect(Collectors.toList());
+    this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
     this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
         .map(String::trim)
         .filter(s -> !s.isEmpty())
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 77377de7ab8..13ae1d50528 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -20,12 +20,14 @@ package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.Collectors;
 
 /**
@@ -43,7 +45,7 @@ import java.util.stream.Collectors;
  */
 public class CustomAvroKeyGenerator extends BaseKeyGenerator {
 
-  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+  public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
   public static final String SPLIT_REGEX = ":";
 
   /**
@@ -55,7 +57,11 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator 
{
 
   public CustomAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
+    this.recordKeyFields = 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null))
+        .map(recordKeyConfigValue ->
+            Arrays.stream(recordKeyConfigValue.split(","))
+                .map(String::trim).collect(Collectors.toList())
+        ).orElse(Collections.emptyList());
     this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
index dc0bc3cef2f..517798e7e7c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -19,10 +19,8 @@ package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -31,11 +29,9 @@ import java.util.List;
  */
 public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
 
-  private static final String EMPTY_PARTITION = "";
-
   public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
     super(config);
-    this.recordKeyFields = 
Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+    this.recordKeyFields = KeyGenUtils.getRecordKeyFields(config);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index fdc17f1c799..5c24dd09291 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser;
 
@@ -37,6 +38,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class KeyGenUtils {
 
@@ -48,6 +50,9 @@ public class KeyGenUtils {
   public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
   public static final String DEFAULT_COMPOSITE_KEY_FILED_VALUE = ":";
 
+  public static final String RECORD_KEY_GEN_PARTITION_ID_CONFIG = 
"_hoodie.record.key.gen.partition.id";
+  public static final String RECORD_KEY_GEN_INSTANT_TIME_CONFIG = 
"_hoodie.record.key.gen.instant.time";
+
   /**
    * Infers the key generator type based on the record key and partition 
fields.
    * <p>
@@ -60,11 +65,28 @@ public class KeyGenUtils {
    * @return Inferred key generator type.
    */
   public static KeyGeneratorType inferKeyGeneratorType(
-      String recordsKeyFields, String partitionFields) {
+      Option<String> recordsKeyFields, String partitionFields) {
+    boolean autoGenerateRecordKeys = !recordsKeyFields.isPresent();
+    if (autoGenerateRecordKeys) {
+      return inferKeyGeneratorTypeForAutoKeyGen(partitionFields);
+    } else {
+      if (!StringUtils.isNullOrEmpty(partitionFields)) {
+        int numPartFields = partitionFields.split(",").length;
+        int numRecordKeyFields = recordsKeyFields.get().split(",").length;
+        if (numPartFields == 1 && numRecordKeyFields == 1) {
+          return KeyGeneratorType.SIMPLE;
+        }
+        return KeyGeneratorType.COMPLEX;
+      }
+      return KeyGeneratorType.NON_PARTITION;
+    }
+  }
+
+  // When auto record key gen is enabled, our inference will be based on 
partition path only.
+  private static KeyGeneratorType inferKeyGeneratorTypeForAutoKeyGen(String 
partitionFields) {
     if (!StringUtils.isNullOrEmpty(partitionFields)) {
       int numPartFields = partitionFields.split(",").length;
-      int numRecordKeyFields = recordsKeyFields.split(",").length;
-      if (numPartFields == 1 && numRecordKeyFields == 1) {
+      if (numPartFields == 1) {
         return KeyGeneratorType.SIMPLE;
       }
       return KeyGeneratorType.COMPLEX;
@@ -85,7 +107,8 @@ public class KeyGenUtils {
 
   /**
    * Fetches partition path from the GenericRecord.
-   * @param genericRecord generic record of interest.
+   *
+   * @param genericRecord   generic record of interest.
    * @param keyGeneratorOpt Optional BaseKeyGenerator. If not, meta field will 
be used.
    * @return the partition path for the passed in generic record.
    */
@@ -107,18 +130,18 @@ public class KeyGenUtils {
   public static String[] extractRecordKeysByFields(String recordKey, 
List<String> fields) {
     String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
     return Arrays.stream(fieldKV).map(kv -> 
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
-            .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
(fields.contains(kvArray[0])))
-            .map(kvArray -> {
-              if (kvArray.length == 1) {
-                return kvArray[0];
-              } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
-                return null;
-              } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
-                return "";
-              } else {
-                return kvArray[1];
-              }
-            }).toArray(String[]::new);
+        .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
(fields.contains(kvArray[0])))
+        .map(kvArray -> {
+          if (kvArray.length == 1) {
+            return kvArray[0];
+          } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
+            return null;
+          } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
+            return "";
+          } else {
+            return kvArray[1];
+          }
+        }).toArray(String[]::new);
   }
 
   public static String getRecordKey(GenericRecord record, List<String> 
recordKeyFields, boolean consistentLogicalTimestampEnabled) {
@@ -144,7 +167,7 @@ public class KeyGenUtils {
   }
 
   public static String getRecordPartitionPath(GenericRecord record, 
List<String> partitionPathFields,
-      boolean hiveStylePartitioning, boolean encodePartitionPath, boolean 
consistentLogicalTimestampEnabled) {
+                                              boolean hiveStylePartitioning, 
boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) {
     if (partitionPathFields.isEmpty()) {
       return "";
     }
@@ -176,7 +199,7 @@ public class KeyGenUtils {
   }
 
   public static String getPartitionPath(GenericRecord record, String 
partitionPathField,
-      boolean hiveStylePartitioning, boolean encodePartitionPath, boolean 
consistentLogicalTimestampEnabled) {
+                                        boolean hiveStylePartitioning, boolean 
encodePartitionPath, boolean consistentLogicalTimestampEnabled) {
     String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, 
partitionPathField, true, consistentLogicalTimestampEnabled);
     if (partitionPath == null || partitionPath.isEmpty()) {
       partitionPath = HUDI_DEFAULT_PARTITION_PATH;
@@ -193,7 +216,7 @@ public class KeyGenUtils {
   /**
    * Create a date time parser class for TimestampBasedKeyGenerator, passing 
in any configs needed.
    */
-  public static BaseHoodieDateTimeParser createDateTimeParser(TypedProperties 
props, String parserClass) throws IOException  {
+  public static BaseHoodieDateTimeParser createDateTimeParser(TypedProperties 
props, String parserClass) throws IOException {
     try {
       return (BaseHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, 
props);
     } catch (Throwable e) {
@@ -228,4 +251,22 @@ public class KeyGenUtils {
     }
     return keyGenerator;
   }
+
+  public static List<String> getRecordKeyFields(TypedProperties props) {
+    return 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null))
+        .map(recordKeyConfigValue ->
+            Arrays.stream(recordKeyConfigValue.split(","))
+                .map(String::trim)
+                .filter(s -> !s.isEmpty())
+                .collect(Collectors.toList())
+        ).orElse(Collections.emptyList());
+  }
+
+  /**
+   * @param props props of interest.
+   * @return true if record keys need to be auto generated. false otherwise.
+   */
+  public static boolean enableAutoGenerateRecordKeys(TypedProperties props) {
+    return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index 59c883eaadc..6f127d0db15 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -18,27 +18,22 @@
 package org.apache.hudi.keygen;
 
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.apache.avro.generic.GenericRecord;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Avro simple Key generator for non-partitioned Hive Tables.
  */
 public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
 
-  private static final String EMPTY_PARTITION = "";
   private static final List<String> EMPTY_PARTITION_FIELD_LIST = new 
ArrayList<>();
 
   public NonpartitionedAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
-        .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
+    this.recordKeyFields = KeyGenUtils.getRecordKeyFields(config);
     this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
index c7398e94ece..82a137f7cb1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -19,6 +19,7 @@ package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import java.util.Collections;
@@ -29,19 +30,17 @@ import java.util.Collections;
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String 
partitionPathField) {
+  SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField, 
String partitionPathField) {
     super(props);
-    this.recordKeyFields = recordKeyField == null
-        ? Collections.emptyList()
-        : Collections.singletonList(recordKeyField);
+    this.recordKeyFields = recordKeyField.map(keyField -> 
Collections.singletonList(keyField)).orElse(Collections.emptyList());
     this.partitionPathFields = Collections.singletonList(partitionPathField);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
index 60ccc694f94..2c75867f8da 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
@@ -76,7 +76,7 @@ public class TimestampBasedAvroKeyGenerator extends 
SimpleAvroKeyGenerator {
   }
 
   TimestampBasedAvroKeyGenerator(TypedProperties config, String 
recordKeyField, String partitionPathField) throws IOException {
-    super(config, recordKeyField, partitionPathField);
+    super(config, Option.ofNullable(recordKeyField), partitionPathField);
     String dateTimeParserClass = 
config.getString(KeyGeneratorOptions.Config.DATE_TIME_PARSER_PROP, 
HoodieDateTimeParser.class.getName());
     this.parser = KeyGenUtils.createDateTimeParser(config, 
dateTimeParserClass);
     this.inputDateTimeZone = parser.getInputDateTimeZone();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
index b24b9a8e2d9..f375095122d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
@@ -21,9 +21,11 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.CustomAvroKeyGenerator;
 import org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator;
+import org.apache.hudi.keygen.AutoRecordGenWrapperAvroKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -71,22 +73,36 @@ public class HoodieAvroKeyGeneratorFactory {
       throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + 
keyGeneratorType);
     }
 
+    BaseKeyGenerator keyGenerator = null;
+
     switch (keyGeneratorTypeEnum) {
       case SIMPLE:
-        return new SimpleAvroKeyGenerator(props);
+        keyGenerator = new SimpleAvroKeyGenerator(props);
+        break;
       case COMPLEX:
-        return new ComplexAvroKeyGenerator(props);
+        keyGenerator = new ComplexAvroKeyGenerator(props);
+        break;
       case TIMESTAMP:
-        return new TimestampBasedAvroKeyGenerator(props);
+        keyGenerator = new TimestampBasedAvroKeyGenerator(props);
+        break;
       case CUSTOM:
-        return new CustomAvroKeyGenerator(props);
+        keyGenerator = new CustomAvroKeyGenerator(props);
+        break;
       case NON_PARTITION:
-        return new NonpartitionedAvroKeyGenerator(props);
+        keyGenerator = new NonpartitionedAvroKeyGenerator(props);
+        break;
       case GLOBAL_DELETE:
-        return new GlobalAvroDeleteKeyGenerator(props);
+        keyGenerator = new GlobalAvroDeleteKeyGenerator(props);
+        break;
       default:
         throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " 
+ keyGeneratorType);
     }
+
+    if (KeyGenUtils.enableAutoGenerateRecordKeys(props)) {
+      return new AutoRecordGenWrapperAvroKeyGenerator(props, keyGenerator);
+    } else {
+      return keyGenerator;
+    }
   }
 
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index b79404ff23e..ae0b259dd73 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 
 import org.junit.jupiter.api.Assertions;
@@ -34,22 +35,22 @@ public class TestKeyGenUtils {
   public void testInferKeyGeneratorType() {
     assertEquals(
         KeyGeneratorType.SIMPLE,
-        KeyGenUtils.inferKeyGeneratorType("col1", "partition1"));
+        KeyGenUtils.inferKeyGeneratorType(Option.of("col1"), "partition1"));
     assertEquals(
         KeyGeneratorType.COMPLEX,
-        KeyGenUtils.inferKeyGeneratorType("col1", "partition1,partition2"));
+        KeyGenUtils.inferKeyGeneratorType(Option.of("col1"), 
"partition1,partition2"));
     assertEquals(
         KeyGeneratorType.COMPLEX,
-        KeyGenUtils.inferKeyGeneratorType("col1,col2", "partition1"));
+        KeyGenUtils.inferKeyGeneratorType(Option.of("col1,col2"), 
"partition1"));
     assertEquals(
         KeyGeneratorType.COMPLEX,
-        KeyGenUtils.inferKeyGeneratorType("col1,col2", 
"partition1,partition2"));
+        KeyGenUtils.inferKeyGeneratorType(Option.of("col1,col2"), 
"partition1,partition2"));
     assertEquals(
         KeyGeneratorType.NON_PARTITION,
-        KeyGenUtils.inferKeyGeneratorType("col1,col2", ""));
+        KeyGenUtils.inferKeyGeneratorType(Option.of("col1,col2"), ""));
     assertEquals(
         KeyGeneratorType.NON_PARTITION,
-        KeyGenUtils.inferKeyGeneratorType("col1,col2", null));
+        KeyGenUtils.inferKeyGeneratorType(Option.of("col1,col2"), null));
   }
 
   @Test
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
index b69d84442bc..96095da3716 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
@@ -20,9 +20,11 @@ package org.apache.hudi.keygen.factory;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.AutoRecordGenWrapperAvroKeyGenerator;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.CustomAvroKeyGenerator;
 import org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
@@ -32,6 +34,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -97,4 +100,14 @@ public class TestCreateAvroKeyGeneratorByTypeWithFactory {
         throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " 
+ keyGenType);
     }
   }
+
+  @Test
+  public void testAutoRecordKeyGenerator() throws IOException {
+    props = new TypedProperties();
+    props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition");
+    props.put(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, "100");
+    props.put(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 1);
+    KeyGenerator keyGenerator = 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(props);
+    
Assertions.assertEquals(AutoRecordGenWrapperAvroKeyGenerator.class.getName(), 
keyGenerator.getClass().getName());
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
new file mode 100644
index 00000000000..ce767665a6f
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.util.List;
+
+/**
+ * A wrapper key generator to intercept getRecordKey calls for auto record key 
generator.
+ * <ol>
+ *   <li>Generated keys will be unique not only w/in provided 
[[org.apache.spark.sql.DataFrame]], but
+ *   globally unique w/in the target table</li>
+ *   <li>Generated keys have minimal overhead (to compute, persist and 
read)</li>
+ * </ol>
+ *
+ * Keys adhere to the following format:
+ *
+ * [instantTime]_[PartitionId]_[RowId]
+ *
+ * where
+ * instantTime refers to the commit time of the batch being ingested.
+ * PartitionId refers to spark's partition Id.
+ * RowId refers to the row index within the spark partition.
+ */
+public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator {
+
+  private final BuiltinKeyGenerator builtinKeyGenerator;
+  private final int partitionId;
+  private final String instantTime;
+  private int rowId;
+
+  public AutoRecordGenWrapperKeyGenerator(TypedProperties config, 
BuiltinKeyGenerator builtinKeyGenerator) {
+    super(config);
+    this.builtinKeyGenerator = builtinKeyGenerator;
+    this.rowId = 0;
+    this.partitionId = 
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
+    this.instantTime = 
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return builtinKeyGenerator.getPartitionPath(record);
+  }
+
+  @Override
+  public String getRecordKey(Row row) {
+    return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+  }
+
+  @Override
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    return UTF8String.fromString(HoodieRecord.generateSequenceId(instantTime, 
partitionId, rowId++));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    return builtinKeyGenerator.getPartitionPath(row);
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType 
schema) {
+    return builtinKeyGenerator.getPartitionPath(internalRow, schema);
+  }
+
+  @Override
+  public List<String> getRecordKeyFieldNames() {
+    return builtinKeyGenerator.getRecordKeyFieldNames();
+  }
+
+  public List<String> getPartitionPathFields() {
+    return builtinKeyGenerator.getPartitionPathFields();
+  }
+
+  public boolean isConsistentLogicalTimestampEnabled() {
+    return builtinKeyGenerator.isConsistentLogicalTimestampEnabled();
+  }
+
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index c9cff284e80..d00ca066ced 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -41,10 +41,7 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator 
{
 
   public ComplexKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP))
-        .map(String::trim)
-        .filter(s -> !s.isEmpty())
-        .collect(Collectors.toList());
+    this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
     this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP))
         .map(String::trim)
         .filter(s -> !s.isEmpty())
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index fcd94bb4f15..1526164207f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -60,17 +60,17 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator 
{
     // NOTE: We have to strip partition-path configuration, since it could 
only be interpreted by
     //       this key-gen
     super(stripPartitionPathConfig(props));
-    this.recordKeyFields =
-        
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
-            .map(String::trim)
-            .collect(Collectors.toList());
+    this.recordKeyFields = 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null))
+        .map(recordKeyConfigValue ->
+            Arrays.stream(recordKeyConfigValue.split(","))
+                .map(String::trim)
+                .collect(Collectors.toList())
+        ).orElse(Collections.emptyList());
     String partitionPathFields = 
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
     this.partitionPathFields = partitionPathFields == null
         ? Collections.emptyList()
         : 
Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList());
     this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
-
-    validateRecordKeyFields();
   }
 
   @Override
@@ -86,7 +86,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
   @Override
   public String getRecordKey(Row row) {
     return getRecordKeyFieldNames().size() == 1
-        ? new SimpleKeyGenerator(config, 
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), 
null).getRecordKey(row)
+        ? new SimpleKeyGenerator(config, 
Option.ofNullable(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())),
 null).getRecordKey(row)
         : new ComplexKeyGenerator(config).getRecordKey(row);
   }
 
@@ -155,12 +155,6 @@ public class CustomKeyGenerator extends 
BuiltinKeyGenerator {
     return partitionPath.toString();
   }
 
-  private void validateRecordKeyFields() {
-    if (getRecordKeyFieldNames() == null || 
getRecordKeyFieldNames().isEmpty()) {
-      throw new HoodieKeyException("Unable to find field names for record key 
in cfg");
-    }
-  }
-
   private static TypedProperties stripPartitionPathConfig(TypedProperties 
props) {
     TypedProperties filtered = new TypedProperties(props);
     // NOTE: We have to stub it out w/ empty string, since we properties are:
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 7fcc16094ea..8a229384307 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.keygen;
 
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
@@ -28,7 +27,6 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -40,7 +38,7 @@ public class GlobalDeleteKeyGenerator extends 
BuiltinKeyGenerator {
   private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator;
   public GlobalDeleteKeyGenerator(TypedProperties config) {
     super(config);
-    this.recordKeyFields = 
Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+    this.recordKeyFields = KeyGenUtils.getRecordKeyFields(config);
     this.globalAvroDeleteKeyGenerator = new 
GlobalAvroDeleteKeyGenerator(config);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index 100bcc2cd7f..2854088c47c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -18,9 +18,9 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
@@ -29,7 +29,6 @@ import org.apache.spark.unsafe.types.UTF8String;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Simple Key generator for non-partitioned Hive Tables.
@@ -40,10 +39,7 @@ public class NonpartitionedKeyGenerator extends 
BuiltinKeyGenerator {
 
   public NonpartitionedKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
-        .split(","))
-        .map(String::trim)
-        .collect(Collectors.toList());
+    this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
     this.partitionPathFields = Collections.emptyList();
     this.nonpartitionedAvroKeyGenerator = new 
NonpartitionedAvroKeyGenerator(props);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index 4b1a5e5cb44..c897d6b657e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -38,21 +39,21 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator 
{
   private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
 
   public SimpleKeyGenerator(TypedProperties props) {
-    this(props, 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleKeyGenerator(TypedProperties props, String recordKeyField, String 
partitionPathField) {
+  SimpleKeyGenerator(TypedProperties props, Option<String> recordKeyField, 
String partitionPathField) {
     super(props);
     // Make sure key-generator is configured properly
     validateRecordKey(recordKeyField);
     validatePartitionPath(partitionPathField);
 
-    this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : 
Collections.singletonList(recordKeyField);
+    this.recordKeyFields = !recordKeyField.isPresent() ? 
Collections.emptyList() : Collections.singletonList(recordKeyField.get());
     this.partitionPathFields = partitionPathField == null ? 
Collections.emptyList() : Collections.singletonList(partitionPathField);
     this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, 
recordKeyField, partitionPathField);
   }
@@ -116,10 +117,10 @@ public class SimpleKeyGenerator extends 
BuiltinKeyGenerator {
         String.format("Single partition-path field is expected; provided 
(%s)", partitionPathField));
   }
 
-  private static void validateRecordKey(String recordKeyField) {
-    checkArgument(recordKeyField == null || !recordKeyField.isEmpty(),
+  private void validateRecordKey(Option<String> recordKeyField) {
+    checkArgument(!recordKeyField.isPresent() || 
!recordKeyField.get().isEmpty(),
         "Record key field has to be non-empty!");
-    checkArgument(recordKeyField == null || 
!recordKeyField.contains(FIELDS_SEP),
+    checkArgument(!recordKeyField.isPresent() || 
!recordKeyField.get().contains(FIELDS_SEP),
         String.format("Single record-key field is expected; provided (%s)", 
recordKeyField));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index fa36f2152cb..470af045485 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.spark.sql.Row;
@@ -41,7 +42,7 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
   private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
 
   public TimestampBasedKeyGenerator(TypedProperties config) throws IOException 
{
-    this(config, 
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(config, 
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),
         config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
@@ -50,7 +51,7 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
   }
 
   TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, 
String partitionPathField) throws IOException {
-    super(config, recordKeyField, partitionPathField);
+    super(config, Option.ofNullable(recordKeyField), partitionPathField);
     timestampBasedAvroKeyGenerator = new 
TimestampBasedAvroKeyGenerator(config, recordKeyField, partitionPathField);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index b9c4043ecda..ae961b4dcf5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -19,13 +19,17 @@
 package org.apache.hudi.keygen.factory;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.ComplexKeyGenerator;
 import org.apache.hudi.keygen.CustomKeyGenerator;
 import org.apache.hudi.keygen.GlobalDeleteKeyGenerator;
+import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -72,8 +76,15 @@ public class HoodieSparkKeyGeneratorFactory {
 
   public static KeyGenerator createKeyGenerator(TypedProperties props) throws 
IOException {
     String keyGeneratorClass = getKeyGeneratorClassName(props);
+    boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props);
     try {
-      return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, 
props);
+      KeyGenerator keyGenerator = (KeyGenerator) 
ReflectionUtils.loadClass(keyGeneratorClass, props);
+      if (autoRecordKeyGen) {
+        return new AutoRecordGenWrapperKeyGenerator(props, 
(BuiltinKeyGenerator) keyGenerator);
+      } else {
+        // if user comes with their own key generator.
+        return keyGenerator;
+      }
     } catch (Throwable e) {
       throw new IOException("Could not load key generator class " + 
keyGeneratorClass, e);
     }
@@ -112,7 +123,7 @@ public class HoodieSparkKeyGeneratorFactory {
   public static KeyGeneratorType 
inferKeyGeneratorTypeFromWriteConfig(TypedProperties props) {
     String partitionFields = 
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), null);
     String recordsKeyFields = 
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null);
-    return inferKeyGeneratorType(recordsKeyFields, partitionFields);
+    return inferKeyGeneratorType(Option.ofNullable(recordsKeyFields), 
partitionFields);
   }
 
   public static String getKeyGeneratorClassName(TypedProperties props) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index e239db1b5a5..835a1251aaf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -27,10 +27,11 @@ import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.SparkHoodieIndexFactory
-import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
+import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, 
BuiltinKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
 import 
org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, 
ParallelismHelper}
 import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
 import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, 
getNestedInternalRowValue}
@@ -60,9 +61,11 @@ object HoodieDatasetBulkInsertHelper
   def prepareForBulkInsert(df: DataFrame,
                            config: HoodieWriteConfig,
                            partitioner: BulkInsertPartitioner[Dataset[Row]],
-                           shouldDropPartitionColumns: Boolean): Dataset[Row] 
= {
+                           shouldDropPartitionColumns: Boolean,
+                           instantTime: String): Dataset[Row] = {
     val populateMetaFields = config.populateMetaFields()
     val schema = df.schema
+    val autoGenerateRecordKeys = 
KeyGenUtils.enableAutoGenerateRecordKeys(config.getProps)
 
     val metaFields = Seq(
       StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
@@ -79,11 +82,22 @@ object HoodieDatasetBulkInsertHelper
 
       val prependedRdd: RDD[InternalRow] =
         df.queryExecution.toRdd.mapPartitions { iter =>
-          val keyGenerator =
-            ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
-              .asInstanceOf[SparkKeyGeneratorInterface]
+          val typedProps = new TypedProperties(config.getProps)
+          if (autoGenerateRecordKeys) {
+            
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()))
+            
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime)
+          }
+          val sparkKeyGenerator =
+            ReflectionUtils.loadClass(keyGeneratorClassName, typedProps)
+              .asInstanceOf[BuiltinKeyGenerator]
+              val keyGenerator: BuiltinKeyGenerator = if 
(autoGenerateRecordKeys) {
+                new AutoRecordGenWrapperKeyGenerator(typedProps, 
sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
+              } else {
+                sparkKeyGenerator
+              }
 
           iter.map { row =>
+            // auto generate record keys if needed
             val recordKey = keyGenerator.getRecordKey(row, schema)
             val partitionPath = keyGenerator.getPartitionPath(row, schema)
             val commitTimestamp = UTF8String.EMPTY_UTF8
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index a767fd3c5bf..932fa0096cf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -18,9 +18,12 @@
 package org.apache.hudi.util
 
 import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator, KeyGenerator}
+import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, 
KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
 
 import scala.collection.JavaConverters._
 
@@ -31,28 +34,30 @@ object SparkKeyGenUtils {
    * @return partition columns
    */
   def getPartitionColumns(props: TypedProperties): String = {
-    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
-    getPartitionColumns(keyGenerator, props)
+    val keyGeneratorClass = getKeyGeneratorClassName(props)
+    getPartitionColumns(keyGeneratorClass, props)
   }
 
   /**
-   * @param keyGen key generator
+   * @param keyGen key generator class name
    * @return partition columns
    */
-  def getPartitionColumns(keyGen: KeyGenerator, typedProperties: 
TypedProperties): String = {
-    keyGen match {
-      // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path 
filed format
-      // is: "field_name: field_type", we extract the field_name from the 
partition path field.
-      case c: BaseKeyGenerator
-        if c.isInstanceOf[CustomKeyGenerator] || 
c.isInstanceOf[CustomAvroKeyGenerator] =>
-        c.getPartitionPathFields.asScala.map(pathField =>
-          pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
-            .headOption.getOrElse(s"Illegal partition path field format: 
'$pathField' for ${c.getClass.getSimpleName}"))
-          .mkString(",")
-
-      case b: BaseKeyGenerator => 
b.getPartitionPathFields.asScala.mkString(",")
-      case _ => 
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+  def getPartitionColumns(keyGenClass: String, typedProperties: 
TypedProperties): String = {
+    // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path 
filed format
+    // is: "field_name: field_type", we extract the field_name from the 
partition path field.
+    if (keyGenClass.equals(classOf[CustomKeyGenerator].getCanonicalName) || 
keyGenClass.equals(classOf[CustomAvroKeyGenerator].getCanonicalName)) {
+      
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+        .split(",").map(pathField => {
+        pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
+          .headOption.getOrElse(s"Illegal partition path field format: 
'$pathField' for ${keyGenClass}")}).mkString(",")
+    } else if 
(keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
+      || 
keyGenClass.equals(classOf[NonpartitionedAvroKeyGenerator].getCanonicalName)
+      || keyGenClass.equals(classOf[GlobalDeleteKeyGenerator].getCanonicalName)
+      || 
keyGenClass.equals(classOf[GlobalAvroDeleteKeyGenerator].getCanonicalName)) {
+      StringUtils.EMPTY_STRING
+    } else {
+      
checkArgument(typedProperties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()),
 "Partition path needs to be set")
+      
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
     }
   }
-
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f4471e89a58..a5e4456381b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -452,7 +452,6 @@ public class HoodieTableConfig extends HoodieConfig {
       if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
         
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
       }
-
       hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS);
 
       storeProperties(hoodieConfig.getProps(), outputStream);
@@ -520,9 +519,13 @@ public class HoodieTableConfig extends HoodieConfig {
   }
 
   public Option<String[]> getRecordKeyFields() {
-    String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
-    return Option.of(Arrays.stream(keyFieldsValue.split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}));
+    String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, null);
+    if (keyFieldsValue == null) {
+      return Option.empty();
+    } else {
+      return Option.of(Arrays.stream(keyFieldsValue.split(","))
+          .filter(p -> p.length() > 
0).collect(Collectors.toList()).toArray(new String[] {}));
+    }
   }
 
   public Option<String[]> getPartitionFields() {
@@ -639,6 +642,13 @@ public class HoodieTableConfig extends HoodieConfig {
     return getStringOrDefault(RECORDKEY_FIELDS, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
   }
 
+  /**
+   * @returns the record key field prop.
+   */
+  public String getRawRecordKeyFieldProp() {
+    return getStringOrDefault(RECORDKEY_FIELDS, null);
+  }
+
   public boolean isCDCEnabled() {
     return getBooleanOrDefault(CDC_ENABLED);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java 
b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index d0baa903919..c5a13dd49c1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -32,6 +32,7 @@ import java.util.List;
  */
 public abstract class BaseKeyGenerator extends KeyGenerator {
 
+  public static final String EMPTY_PARTITION = "";
   protected List<String> recordKeyFields;
   protected List<String> partitionPathFields;
   protected final boolean encodePartitionPath;
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 632bbecf10d..b6a155c133d 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -21,6 +21,7 @@ package org.apache.hudi.integ.testsuite;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.deltastreamer.DeltaSync;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
@@ -78,7 +79,8 @@ public class HoodieDeltaStreamerWrapper extends 
HoodieDeltaStreamer {
   public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchSource() throws Exception {
     DeltaSync service = getDeltaSync();
     service.refreshTimeline();
-    return service.readFromSource(service.getCommitsTimelineOpt());
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return service.readFromSource(service.getCommitsTimelineOpt(), 
instantTime);
   }
 
   public DeltaSync getDeltaSync() {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
new file mode 100644
index 00000000000..9b3a10a3f62
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieKeyGeneratorException
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+
+object AutoRecordKeyGenerationUtils {
+
+  def mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters: Map[String, 
String], hoodieConfig: HoodieConfig): Unit = {
+    val autoGenerateRecordKeys = 
!parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if 
record key is not configured,
+    // hudi will auto generate.
+    if (autoGenerateRecordKeys) {
+      // de-dup is not supported with auto generation of record keys
+      if (parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean) {
+        throw new HoodieKeyGeneratorException("Enabling " + 
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() + " is not supported with auto 
generation of record keys ")
+      }
+      // drop dupes is not supported
+      if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
+        throw new HoodieKeyGeneratorException("Enabling " + 
INSERT_DROP_DUPS.key() + " is not supported with auto generation of record keys 
")
+      }
+      // virtual keys are not supported with auto generation of record keys.
+      if (!parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue().toString).toBoolean) {
+        throw new HoodieKeyGeneratorException("Disabling " + 
HoodieTableConfig.POPULATE_META_FIELDS.key() + " is not supported with auto 
generation of record keys")
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 31e5512a3f9..512e84f3897 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -876,8 +876,8 @@ object DataSourceOptionsHelper {
    */
   def fetchMissingWriteConfigsFromTableConfig(tableConfig: HoodieTableConfig, 
params: Map[String, String]) : Map[String, String] = {
     val missingWriteConfigs = scala.collection.mutable.Map[String, String]()
-    if (!params.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) && 
tableConfig.getRecordKeyFieldProp != null) {
-      missingWriteConfigs ++= 
Map(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> 
tableConfig.getRecordKeyFieldProp)
+    if (!params.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) && 
tableConfig.getRawRecordKeyFieldProp != null) {
+      missingWriteConfigs ++= 
Map(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> 
tableConfig.getRawRecordKeyFieldProp)
     }
     if (!params.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) 
&& tableConfig.getPartitionFieldProp != null) {
       missingWriteConfigs ++= 
Map(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> 
tableConfig.getPartitionFieldProp)
@@ -912,7 +912,7 @@ object DataSourceOptionsHelper {
   }
 
   def inferKeyGenClazz(recordsKeyFields: String, partitionFields: String): 
String = {
-    getKeyGeneratorClassNameFromType(inferKeyGeneratorType(recordsKeyFields, 
partitionFields))
+    
getKeyGeneratorClassNameFromType(inferKeyGeneratorType(Option.ofNullable(recordsKeyFields),
 partitionFields))
   }
 
   implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => 
U): ConfigProperty[U] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index a122e81aa73..9148bf15db1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,6 +21,7 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericData, GenericRecord}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
+import 
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
 import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, 
getAvroRecordNameAndNamespace}
 import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
 import org.apache.hudi.DataSourceWriteOptions._
@@ -50,8 +51,10 @@ import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
 import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, 
SerDeHelper}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.keygen.{SparkKeyGeneratorInterface, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.{createKeyGenerator,
 getKeyGeneratorClassName}
+import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, 
SparkKeyGeneratorInterface, TimestampBasedAvroKeyGenerator, 
TimestampBasedKeyGenerator}
 import org.apache.hudi.metrics.Metrics
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.SyncUtilHelpers
@@ -65,7 +68,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.{SPARK_VERSION, SparkContext, TaskContext}
 import org.slf4j.LoggerFactory
 
 import java.util.function.BiConsumer
@@ -176,7 +179,6 @@ object HoodieSparkSqlWriter {
         jsc.setLocalProperty("spark.scheduler.pool", 
SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME)
       }
     }
-    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(hoodieConfig.getProps))
 
     if (mode == SaveMode.Ignore && tableExists) {
       log.warn(s"hoodie table at $basePath already exists. Ignoring & not 
performing actual writes.")
@@ -184,7 +186,8 @@ object HoodieSparkSqlWriter {
     } else {
       // Handle various save modes
       handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, 
tblName, operation, fs)
-      val partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
+      val partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(getKeyGeneratorClassName(new 
TypedProperties(hoodieConfig.getProps)),
+        toProperties(parameters))
       val timelineTimeZone = 
HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
       val tableMetaClient = if (tableExists) {
         HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
@@ -256,6 +259,7 @@ object HoodieSparkSqlWriter {
           case WriteOperationType.DELETE =>
             val genericRecords = HoodieSparkUtils.createRdd(df, 
avroRecordName, avroRecordNamespace)
             // Convert to RDD[HoodieKey]
+            val keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(hoodieConfig.getProps))
             val hoodieKeysToDelete = genericRecords.map(gr => 
keyGenerator.getKey(gr)).toJavaRDD()
 
             if (!tableExists) {
@@ -286,6 +290,7 @@ object HoodieSparkSqlWriter {
               throw new HoodieException(s"hoodie table at $basePath does not 
exist")
             }
 
+            val keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(hoodieConfig.getProps))
             // Get list of partitions to delete
             val partitionsToDelete = if 
(parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
               val partitionColsToDelete = 
parameters(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).split(",")
@@ -317,6 +322,7 @@ object HoodieSparkSqlWriter {
             val writerSchema = deduceWriterSchema(sourceSchema, 
latestTableSchemaOpt, internalSchemaOpt, parameters)
 
             validateSchemaForHoodieIsDeleted(writerSchema)
+            mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters, 
hoodieConfig)
 
             // Short-circuit if bulk_insert via row is enabled.
             // scalastyle:off
@@ -355,7 +361,7 @@ object HoodieSparkSqlWriter {
             // Convert to RDD[HoodieRecord]
             val hoodieRecords =
               createHoodieRecordRdd(df, writeConfig, parameters, 
avroRecordName, avroRecordNamespace, writerSchema,
-                dataFileSchema, operation)
+                dataFileSchema, operation, instantTime)
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
               asyncCompactionTriggerFn.get.apply(client)
@@ -812,7 +818,8 @@ object HoodieSparkSqlWriter {
     }
 
     val shouldDropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
-    val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, 
writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns)
+    val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, 
writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns,
+      instantTime)
 
     val optsOverrides = Map(
       HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED ->
@@ -1098,10 +1105,11 @@ object HoodieSparkSqlWriter {
                                     recordNameSpace: String,
                                     writerSchema: Schema,
                                     dataFileSchema: Schema,
-                                    operation: WriteOperationType) = {
+                                    operation: WriteOperationType,
+                                    instantTime: String) = {
     val shouldDropPartitionColumns = 
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
-    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(config.getProps))
     val recordType = config.getRecordMerger.getRecordType
+    val autoGenerateRecordKeys : Boolean = 
!parameters.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
 
     val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
       operation.equals(WriteOperationType.UPSERT) ||
@@ -1121,24 +1129,36 @@ object HoodieSparkSqlWriter {
           Some(writerSchema))
 
         avroRecords.mapPartitions(it => {
+          val sparkPartitionId = TaskContext.getPartitionId()
+          val keyGenProps = new TypedProperties(config.getProps)
+          if (autoGenerateRecordKeys) {
+            
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(sparkPartitionId))
+            
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime)
+          }
+          val keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps)
+            .asInstanceOf[BaseKeyGenerator]
+
           val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
           val consistentLogicalTimestampEnabled = parameters.getOrElse(
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
 
-          it.map { avroRecord =>
+          // handle dropping partition columns
+          it.map { avroRec =>
             val processedRecord = if (shouldDropPartitionColumns) {
-              HoodieAvroUtils.rewriteRecord(avroRecord, dataFileSchema)
+              HoodieAvroUtils.rewriteRecord(avroRec, dataFileSchema)
             } else {
-              avroRecord
+              avroRec
             }
+
+            val hoodieKey = new HoodieKey(keyGenerator.getRecordKey(avroRec), 
keyGenerator.getPartitionPath(avroRec))
             val hoodieRecord = if (shouldCombine) {
-              val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRecord, 
config.getString(PRECOMBINE_FIELD),
+              val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, 
config.getString(PRECOMBINE_FIELD),
                 false, 
consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
-              DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, 
keyGenerator.getKey(avroRecord),
+              DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, 
hoodieKey,
                 config.getString(PAYLOAD_CLASS_NAME))
             } else {
-              DataSourceUtils.createHoodieRecord(processedRecord, 
keyGenerator.getKey(avroRecord),
+              DataSourceUtils.createHoodieRecord(processedRecord, hoodieKey,
                 config.getString(PAYLOAD_CLASS_NAME))
             }
             hoodieRecord
@@ -1146,13 +1166,19 @@ object HoodieSparkSqlWriter {
         }).toJavaRDD()
 
       case HoodieRecord.HoodieRecordType.SPARK =>
-        val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
         val dataFileStructType = 
HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
         val writerStructType = 
HoodieInternalRowUtils.getCachedSchema(writerSchema)
         val sourceStructType = df.schema
 
         df.queryExecution.toRdd.mapPartitions { it =>
+          val sparkPartitionId = TaskContext.getPartitionId()
+          val keyGenProps = new TypedProperties(config.getProps)
+          if (autoGenerateRecordKeys) {
+            
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(sparkPartitionId))
+            
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime)
+          }
+          val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
           // NOTE: To make sure we properly transform records
           val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
@@ -1161,7 +1187,6 @@ object HoodieSparkSqlWriter {
             val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
             val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
             val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-
             val targetRow = targetStructTypeRowWriter(sourceRow)
 
             new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index a2acca66114..09a6d873e8b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -146,8 +146,11 @@ object HoodieWriterUtils {
       if (null != tableConfig) {
         val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
         val tableConfigRecordKey = 
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
-        if (null != datasourceRecordKey && null != tableConfigRecordKey
-          && datasourceRecordKey != tableConfigRecordKey) {
+        if ((null != datasourceRecordKey && null != tableConfigRecordKey
+          && datasourceRecordKey != tableConfigRecordKey) || (null != 
datasourceRecordKey && datasourceRecordKey.nonEmpty
+          && tableConfigRecordKey == null)) {
+          // if both are non null, they should match.
+          // if incoming record key is non empty, table config should also be 
non empty.
           
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
         }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 5ee0089f104..b8d93fa51e1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -25,6 +25,7 @@ import 
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
 import org.apache.spark.internal.Logging
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index bb682cf9b5f..239ec31eb65 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -196,10 +196,11 @@ object HoodieOptionConfig {
     // validate primary key
     val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
       .map(_.split(",").filter(_.length > 0))
-    ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is 
specified.")
-    primaryKeys.get.foreach { primaryKey =>
-      ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, 
getRootLevelFieldName(primaryKey))),
-        s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
+    if (primaryKeys.isDefined) {
+      primaryKeys.get.foreach { primaryKey =>
+        ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, 
getRootLevelFieldName(primaryKey))),
+          s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
+      }
     }
 
     // validate preCombine key
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index ffc4079824f..a2194feb31b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -196,6 +196,12 @@ trait ProvidesHoodieConfig extends Logging {
       HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> 
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
       HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
     )
+    // for auto record key gen
+    val recordKeyConfigValue = if (hoodieCatalogTable.primaryKeys.length > 1) {
+      hoodieCatalogTable.primaryKeys.mkString(",")
+    } else {
+      null
+    }
 
     val overridingOpts = extraOptions ++ Map(
       "path" -> path,
@@ -204,7 +210,7 @@ trait ProvidesHoodieConfig extends Logging {
       OPERATION.key -> operation,
       HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
       URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
-      RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+      RECORDKEY_FIELD.key -> recordKeyConfigValue,
       PRECOMBINE_FIELD.key -> preCombineField,
       PARTITIONPATH_FIELD.key -> partitionFieldsStr
     )
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index b94705a9a2a..04f1fbd5ba0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -49,7 +49,12 @@ class SqlKeyGenerator(props: TypedProperties) extends 
BuiltinKeyGenerator(props)
     }
   }
 
-  private lazy val complexKeyGen = new ComplexKeyGenerator(props)
+  private lazy val autoRecordKeyGen = 
KeyGenUtils.enableAutoGenerateRecordKeys(props)
+  private lazy val complexKeyGen = if (autoRecordKeyGen) {
+    new AutoRecordGenWrapperKeyGenerator(props, new ComplexKeyGenerator(props))
+  } else {
+    new ComplexKeyGenerator(props)
+  }
   private lazy val originalKeyGen =
     Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null))
       .map { originalKeyGenClassName =>
@@ -61,7 +66,12 @@ class SqlKeyGenerator(props: TypedProperties) extends 
BuiltinKeyGenerator(props)
         keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
         keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, 
convertedKeyGenClassName)
 
-        
KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+        val keyGenerator = 
KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+        if (autoRecordKeyGen) {
+          new AutoRecordGenWrapperKeyGenerator(keyGenProps, 
keyGenerator.asInstanceOf[BuiltinKeyGenerator])
+        } else {
+          keyGenerator
+        }
       }
 
   override def getRecordKey(record: GenericRecord): String =
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
index 17c6f23089e..1038e0c9226 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
@@ -129,7 +129,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-        new NonSortPartitionerWithRows(), false);
+        new NonSortPartitionerWithRows(), false, "0000000001");
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), 10);
@@ -168,7 +168,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
         .build();
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-        new NonSortPartitionerWithRows(), false);
+        new NonSortPartitionerWithRows(), false, "000001111");
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), 10);
@@ -205,7 +205,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     rows.addAll(updates);
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-        new NonSortPartitionerWithRows(), false);
+        new NonSortPartitionerWithRows(), false, "000001111");
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), enablePreCombine ? 10 : 15);
@@ -309,7 +309,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     try {
       Dataset<Row> preparedDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-          new NonSortPartitionerWithRows(), false);
+          new NonSortPartitionerWithRows(), false, "000001111");
       preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
@@ -321,19 +321,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     dataset = sqlContext.createDataFrame(rows, structType);
     try {
       Dataset<Row> preparedDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-          new NonSortPartitionerWithRows(), false);
-      preparedDF.count();
-      fail("Should have thrown exception");
-    } catch (Exception e) {
-      // ignore
-    }
-
-    config = getConfigBuilder(schemaStr).withProps(getProps(false, true, 
false, true)).build();
-    rows = DataSourceTestUtils.generateRandomRows(10);
-    dataset = sqlContext.createDataFrame(rows, structType);
-    try {
-      Dataset<Row> preparedDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-          new NonSortPartitionerWithRows(), false);
+          new NonSortPartitionerWithRows(), false, "000001111");
       preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
@@ -345,7 +333,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     dataset = sqlContext.createDataFrame(rows, structType);
     try {
       Dataset<Row> preparedDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-          new NonSortPartitionerWithRows(), false);
+          new NonSortPartitionerWithRows(), false, "000001111");
       preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
index caed61249a1..d9d1e51059b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -77,7 +77,11 @@ public class TestComplexKeyGenerator extends 
KeyGeneratorTestUtilities {
 
   @Test
   public void testNullRecordKeyFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+    GenericRecord record = getRecord();
+    Assertions.assertThrows(StringIndexOutOfBoundsException.class, () ->   {
+      ComplexKeyGenerator keyGenerator = new 
ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp());
+      keyGenerator.getRecordKey(record);
+    });
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 311356a0f71..e001bfc13f5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -253,29 +253,23 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
   public void testNoRecordKeyFieldProp(boolean useKeyGeneratorClassName) {
     TypedProperties propsWithoutRecordKeyFieldProps = 
getPropsWithoutRecordKeyFieldProps(useKeyGeneratorClassName);
     try {
-      BuiltinKeyGenerator keyGenerator =
-          (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(propsWithoutRecordKeyFieldProps);
+      BuiltinKeyGenerator keyGenerator = new 
CustomKeyGenerator(propsWithoutRecordKeyFieldProps);
 
       keyGenerator.getKey(getRecord());
       Assertions.fail("should fail when record key field is not provided!");
     } catch (Exception e) {
       if (useKeyGeneratorClassName) {
         // "Property hoodie.datasource.write.recordkey.field not found" 
exception cause CustomKeyGenerator init fail
-        Assertions.assertTrue(e
-            .getCause()
-            .getCause()
-            .getCause()
-            .getMessage()
-            .contains("Property hoodie.datasource.write.recordkey.field not 
found"));
+        Assertions.assertTrue(e.getMessage()
+            .contains("Unable to find field names for record key in cfg"));
       } else {
-        Assertions.assertTrue(stackTraceToString(e).contains("Property 
hoodie.datasource.write.recordkey.field not found"));
+        Assertions.assertTrue(stackTraceToString(e).contains("Unable to find 
field names for record key in cfg"));
       }
 
     }
 
     try {
-      BuiltinKeyGenerator keyGenerator =
-          (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(propsWithoutRecordKeyFieldProps);
+      BuiltinKeyGenerator keyGenerator = new 
CustomKeyGenerator(propsWithoutRecordKeyFieldProps);
 
       GenericRecord record = getRecord();
       Row row = KeyGeneratorTestUtilities.getRow(record);
@@ -284,14 +278,10 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
     } catch (Exception e) {
       if (useKeyGeneratorClassName) {
         // "Property hoodie.datasource.write.recordkey.field not found" 
exception cause CustomKeyGenerator init fail
-        Assertions.assertTrue(e
-            .getCause()
-            .getCause()
-            .getCause()
-            .getMessage()
-            .contains("Property hoodie.datasource.write.recordkey.field not 
found"));
+        Assertions.assertTrue(e.getMessage()
+            .contains("All of the values for ([]) were either null or empty"));
       } else {
-        Assertions.assertTrue(stackTraceToString(e).contains("Property 
hoodie.datasource.write.recordkey.field not found"));
+        Assertions.assertTrue(stackTraceToString(e).contains("All of the 
values for ([]) were either null or empty"));
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
index 1b25ce65054..df69279cc89 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
@@ -61,7 +61,11 @@ public class TestGlobalDeleteRecordGenerator extends 
KeyGeneratorTestUtilities {
 
   @Test
   public void testNullRecordKeyFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
GlobalDeleteKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+    GenericRecord record = getRecord();
+    Assertions.assertThrows(StringIndexOutOfBoundsException.class, () ->  {
+      BaseKeyGenerator keyGenerator = new 
GlobalDeleteKeyGenerator(getPropertiesWithoutRecordKeyProp());
+      keyGenerator.getRecordKey(record);
+    });
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
index fd299f179c3..fb740d00e2a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
@@ -68,7 +68,11 @@ public class TestNonpartitionedKeyGenerator extends 
KeyGeneratorTestUtilities {
 
   @Test
   public void testNullRecordKeyFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
NonpartitionedKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+    GenericRecord record = getRecord();
+    Assertions.assertThrows(StringIndexOutOfBoundsException.class, () ->  {
+      BaseKeyGenerator keyGenerator = new 
NonpartitionedKeyGenerator(getPropertiesWithoutRecordKeyProp());
+      keyGenerator.getRecordKey(record);
+    });
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
index 77d1b34f136..adf522f8354 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -96,7 +96,11 @@ public class TestSimpleKeyGenerator extends 
KeyGeneratorTestUtilities {
 
   @Test
   public void testNullRecordKeyFields() {
-    assertThrows(IllegalArgumentException.class, () -> new 
SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+    GenericRecord record = getRecord();
+    Assertions.assertThrows(IndexOutOfBoundsException.class, () ->  {
+      BaseKeyGenerator keyGenerator = new 
SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp());
+      keyGenerator.getRecordKey(record);
+    });
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
index ad3edd4495a..45272ec1006 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
@@ -21,9 +21,11 @@ package org.apache.hudi.keygen.factory;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator;
 import org.apache.hudi.keygen.ComplexKeyGenerator;
 import org.apache.hudi.keygen.CustomKeyGenerator;
 import org.apache.hudi.keygen.GlobalDeleteKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -33,6 +35,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -98,4 +101,14 @@ public class TestCreateKeyGeneratorByTypeWithFactory {
         throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " 
+ keyGenType);
     }
   }
+
+  @Test
+  public void testAutoRecordKeyGenerator() throws IOException {
+    props = new TypedProperties();
+    props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition");
+    props.put(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, "100");
+    props.put(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 1);
+    KeyGenerator keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+    Assertions.assertEquals(AutoRecordGenWrapperKeyGenerator.class.getName(), 
keyGenerator.getClass().getName());
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index c9b58fdff01..61a7a04823a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -96,8 +96,8 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), 
"partitionField")
 
-      assertThrows(classOf[IllegalArgumentException]) {
-        new SimpleKeyGenerator(props)
+      assertThrows(classOf[IndexOutOfBoundsException]) {
+        new SimpleKeyGenerator(props).getRecordKey(baseRecord)
       }
     }
 
@@ -262,7 +262,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
     }
 
     // Record's key field not specified
-    assertThrows(classOf[IllegalArgumentException]) {
+    assertThrows(classOf[StringIndexOutOfBoundsException]) {
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, 
"partitionField")
       val keyGen = new ComplexKeyGenerator(props)
@@ -494,8 +494,8 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
       val props = new TypedProperties()
       props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, 
"partitionField")
 
-      assertThrows(classOf[IllegalArgumentException]) {
-        new GlobalDeleteKeyGenerator(props)
+      assertThrows(classOf[StringIndexOutOfBoundsException]) {
+        new GlobalDeleteKeyGenerator(props).getRecordKey(baseRecord)
       }
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 9c0c182cd20..8785366fd51 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -467,7 +467,7 @@ class TestHoodieSparkSqlWriter {
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
 
     // try write to Hudi
-    assertThrows[IOException] {
+    assertThrows[IllegalArgumentException] {
       HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts - 
DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala
new file mode 100644
index 00000000000..6748c82d130
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieSparkRecordMerger}
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+
+object CommonOptionUtils {
+
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+    "hoodie.delete.shuffle.parallelism" -> "1",
+    HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
+    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+  )
+  val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName)
+
+  def getWriterReaderOpts(recordType: HoodieRecordType,
+                          opt: Map[String, String] = commonOpts,
+                          enableFileIndex: Boolean = 
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
+  (Map[String, String], Map[String, String]) = {
+    val fileIndexOpt: Map[String, String] =
+      Map(DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key -> 
enableFileIndex.toString)
+
+    recordType match {
+      case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts ++ 
fileIndexOpt)
+      case _ => (opt, fileIndexOpt)
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
new file mode 100644
index 00000000000..3f737c9dac3
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, 
WriteOperationType}
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.ExceptionUtil.getRootCause
+import org.apache.hudi.exception.{HoodieException, HoodieKeyGeneratorException}
+import org.apache.hudi.functional.CommonOptionUtils._
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils, 
NonpartitionedKeyGenerator, SimpleKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers, 
ScalaAssertionSupport}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.{SaveMode, SparkSession, SparkSessionExtensions}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
+
+import java.util.function.Consumer
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class TestAutoGenerationOfRecordKeys extends HoodieSparkClientTestBase with 
ScalaAssertionSupport {
+  var spark: SparkSession = null
+  val verificationCol: String = "driver"
+  val updatedVerificationVal: String = "driver_update"
+
+  override def getSparkSessionExtensionsInjector: 
util.Option[Consumer[SparkSessionExtensions]] =
+    toJavaOption(
+      Some(
+        JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new 
HoodieSparkSessionExtension().apply(receiver)))
+    )
+
+  @BeforeEach override def setUp() {
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach override def tearDown() = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+    FileSystem.closeAll()
+    System.gc()
+  }
+
+  @ParameterizedTest
+  @CsvSource(value = Array(
+    "AVRO,insert,COPY_ON_WRITE", "AVRO,bulk_insert,COPY_ON_WRITE", 
"AVRO,insert,MERGE_ON_READ", "AVRO,bulk_insert,MERGE_ON_READ"
+  ))
+  def testRecordKeysAutoGen(recordType: HoodieRecordType, op: String, 
tableType: HoodieTableType): Unit = {
+    testRecordKeysAutoGenInternal(recordType, op, tableType)
+  }
+
+  @Test
+  def testRecordKeyAutoGenWithTimestampBasedKeyGen(): Unit = {
+    testRecordKeysAutoGenInternal(HoodieRecordType.AVRO, "insert", 
HoodieTableType.COPY_ON_WRITE,
+      classOf[TimestampBasedKeyGenerator].getName)
+  }
+
+  @Test
+  def testRecordKeyAutoGenWithComplexKeyGen(): Unit = {
+    testRecordKeysAutoGenInternal(HoodieRecordType.AVRO, "insert", 
HoodieTableType.COPY_ON_WRITE,
+      classOf[ComplexKeyGenerator].getName,
+      complexPartitionPath = true)
+  }
+
+  @Test
+  def testRecordKeyAutoGenWithNonPartitionedKeyGen(): Unit = {
+    testRecordKeysAutoGenInternal(HoodieRecordType.AVRO, "insert", 
HoodieTableType.COPY_ON_WRITE,
+      classOf[NonpartitionedKeyGenerator].getName, complexPartitionPath = 
false, nonPartitionedDataset = true)
+  }
+
+  def testRecordKeysAutoGenInternal(recordType: HoodieRecordType, op: String = 
"insert", tableType: HoodieTableType = HoodieTableType.COPY_ON_WRITE,
+                                    keyGenClass: String = 
classOf[SimpleKeyGenerator].getCanonicalName,
+                                    complexPartitionPath: Boolean = false, 
nonPartitionedDataset: Boolean = false): Unit = {
+    val (vanillaWriteOpts, readOpts) = getWriterReaderOpts(recordType)
+
+    var options: Map[String, String] = vanillaWriteOpts ++ Map(
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass)
+
+    val isTimestampBasedKeyGen: Boolean = 
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
+    if (isTimestampBasedKeyGen) {
+      options += Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING"
+      options += Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd"
+      options += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
+    }
+
+    if (complexPartitionPath) {
+      options += KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> 
"rider,_hoodie_is_deleted"
+    }
+    if (nonPartitionedDataset) {
+      options = options -- 
Seq(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+    }
+
+    // add partition Id and instant time
+    options += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
+    options += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
+
+    // NOTE: In this test we deliberately removing record-key configuration
+    //       to validate Hudi is handling this case appropriately
+    val writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 5)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.cache
+
+    //
+    // Step #1: Persist first batch with auto-gen'd record-keys
+    //
+
+    inputDF.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, op)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType.name())
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+    //
+    // Step #2: Persist *same* batch with auto-gen'd record-keys (new record 
keys should
+    //          be generated this time)
+    //
+    val inputDF2 = inputDF
+    inputDF2.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, op)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType.name())
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val readDF = spark.read.format("hudi")
+      .options(readOpts)
+      .load(basePath)
+    readDF.cache
+
+    val recordKeys = readDF.select(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+      .distinct()
+      .collectAsList()
+      .map(_.getString(0))
+
+    // Validate auto-gen'd keys are globally unique
+    assertEquals(10, recordKeys.size)
+
+    // validate entire batch is present in snapshot read
+    val expectedInputDf = 
inputDF.union(inputDF2).drop("partition","rider","_hoodie_is_deleted")
+    val actualDf = readDF.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: 
_*).drop("partition","rider","_hoodie_is_deleted")
+    assertEquals(expectedInputDf.except(actualDf).count, 0)
+  }
+
+  @ParameterizedTest
+  @CsvSource(value = Array(
+    
"hoodie.populate.meta.fields,false","hoodie.combine.before.insert,true","hoodie.datasource.write.insert.drop.duplicates,true"
+  ))
+  def testRecordKeysAutoGenInvalidParams(configKey: String, configValue: 
String): Unit = {
+    val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)
+
+    // NOTE: In this test we deliberately removing record-key configuration
+    //       to validate Hudi is handling this case appropriately
+    var opts = writeOpts -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+
+    // add partition Id and instant time
+    opts += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
+    opts += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 1)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    val e = assertThrows(classOf[HoodieKeyGeneratorException]) {
+      inputDF.write.format("hudi")
+        .options(opts)
+        .option(DataSourceWriteOptions.OPERATION.key, "insert")
+        .option(configKey, configValue)
+        .mode(SaveMode.Overwrite)
+        .save(basePath)
+    }
+
+    assertTrue(getRootCause(e).getMessage.contains(configKey + " is not 
supported with auto generation of record keys"))
+  }
+
+
+  @Test
+  def testRecordKeysAutoGenEnableToDisable(): Unit = {
+    val (vanillaWriteOpts, readOpts) = 
getWriterReaderOpts(HoodieRecordType.AVRO)
+
+    var options: Map[String, String] = vanillaWriteOpts ++ Map(
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[SimpleKeyGenerator].getCanonicalName)
+
+    // NOTE: In this test we deliberately removing record-key configuration
+    //       to validate Hudi is handling this case appropriately
+    var writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 5)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.cache
+
+
+    // add partition Id and instant time
+    writeOpts += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
+    writeOpts += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
+
+    //
+    // Step #1: Persist first batch with auto-gen'd record-keys
+    //
+    inputDF.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, "insert")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+    //
+    // Step #2: Insert w/ explicit record key config. Should fail since we 
can't modify this property.
+    //
+    val e = assertThrows(classOf[HoodieException]) {
+      val inputDF2 = inputDF
+      inputDF2.write.format("hudi")
+        .options(writeOpts ++ Map(
+          DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key"
+        ))
+        .option(DataSourceWriteOptions.OPERATION.key, "insert")
+        .mode(SaveMode.Append)
+        .save(basePath)
+    }
+
+    val expectedMsg = s"RecordKey:\t_row_key\tnull"
+    assertTrue(getRootCause(e).getMessage.contains(expectedMsg))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 6b1773807fe..b229d4f7c2d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.metrics.HoodieMetricsConfig
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.functional.CommonOptionUtils._
 import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
 import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -69,19 +70,6 @@ import org.junit.jupiter.api.function.Executable
  */
 class TestCOWDataSource extends HoodieSparkClientTestBase with 
ScalaAssertionSupport {
   var spark: SparkSession = null
-  val commonOpts = Map(
-    "hoodie.insert.shuffle.parallelism" -> "4",
-    "hoodie.upsert.shuffle.parallelism" -> "4",
-    "hoodie.bulkinsert.shuffle.parallelism" -> "2",
-    "hoodie.delete.shuffle.parallelism" -> "1",
-    HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
-    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
-    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
-    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
-    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
-    HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
-  )
-  val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName)
 
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
@@ -899,7 +887,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
   }
 
   @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("SPARK"))
   def testSparkPartitionByWithCustomKeyGenerator(recordType: 
HoodieRecordType): Unit = {
     val (writeOpts, readOpts) = 
getWriterReaderOptsLessPartitionPath(recordType)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 96108b0d550..f2d1cd58e65 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -55,6 +55,7 @@ class TestStreamingSource extends StreamTest {
       HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(COPY_ON_WRITE)
         .setTableName(getTableName(tablePath))
+        .setRecordKeyFields("id")
         
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
         .setPreCombineField("ts")
         .initTable(spark.sessionState.newHadoopConf(), tablePath)
@@ -107,6 +108,7 @@ class TestStreamingSource extends StreamTest {
       HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(MERGE_ON_READ)
         .setTableName(getTableName(tablePath))
+        .setRecordKeyFields("id")
         
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
         .setPreCombineField("ts")
         .initTable(spark.sessionState.newHadoopConf(), tablePath)
@@ -153,6 +155,7 @@ class TestStreamingSource extends StreamTest {
       HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(COPY_ON_WRITE)
         .setTableName(getTableName(tablePath))
+        .setRecordKeyFields("id")
         
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
         .setPreCombineField("ts")
         .initTable(spark.sessionState.newHadoopConf(), tablePath)
@@ -185,6 +188,7 @@ class TestStreamingSource extends StreamTest {
       val metaClient = HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(COPY_ON_WRITE)
         .setTableName(getTableName(tablePath))
+        .setRecordKeyFields("id")
         
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
         .setPreCombineField("ts")
         .initTable(spark.sessionState.newHadoopConf(), tablePath)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 4e726317844..35ebe872e40 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -109,16 +109,6 @@ class TestHoodieOptionConfig extends 
SparkClientFunctionalTestHarness {
         StructField("dt", StringType, true))
     )
 
-    // miss primaryKey parameter
-    val sqlOptions1 = baseSqlOptions ++ Map(
-      "type" -> "mor"
-    )
-
-    val e1 = intercept[IllegalArgumentException] {
-      HoodieOptionConfig.validateTable(spark, schema, sqlOptions1)
-    }
-    assertTrue(e1.getMessage.contains("No `primaryKey` is specified."))
-
     // primary field not found
     val sqlOptions2 = baseSqlOptions ++ Map(
       "primaryKey" -> "xxx",
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index dfe81391105..d40bae13209 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.model.WriteOperationType
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
@@ -28,6 +28,7 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
 import 
org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
+import org.junit.jupiter.api.Assertions.assertEquals
 
 import java.io.File
 
@@ -1267,4 +1268,41 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       })
     }
   }
+
+  test("Test Insert Into with auto generate record keys") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create a partitioned table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  dt string,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (dt)
+           | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+      // Note: Do not write the field alias, the partition field must be 
placed last.
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1', 10, 1000, "2021-01-05"),
+           | (2, 'a2', 20, 2000, "2021-01-06"),
+           | (3, 'a3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+        Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+        Seq(3, "a3", 30.0, 3000, "2021-01-07")
+      )
+
+      val df = spark.read.format("hudi").load(tmp.getCanonicalPath)
+      assertEquals(3, 
df.select(HoodieRecord.RECORD_KEY_METADATA_FIELD).count())
+    }
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 64686e3f38f..7d1d0758955 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -70,8 +70,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.keygen.KeyGenerator;
-import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metrics.HoodieMetrics;
@@ -103,8 +103,10 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.HoodieInternalRowUtils;
@@ -124,6 +126,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -147,6 +150,7 @@ import static 
org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
 import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName;
 import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
 import static 
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
@@ -190,10 +194,7 @@ public class DeltaSync implements Serializable, Closeable {
    */
   private transient Option<Transformer> transformer;
 
-  /**
-   * Extract the key for the target table.
-   */
-  private KeyGenerator keyGenerator;
+  private String keyGenClassName;
 
   /**
    * Filesystem used.
@@ -257,7 +258,6 @@ public class DeltaSync implements Serializable, Closeable {
   private transient HoodieIngestionMetrics metrics;
   private transient HoodieMetrics hoodieMetrics;
 
-
   /**
    * Unique identifier of the deltastreamer
    * */
@@ -269,6 +269,8 @@ public class DeltaSync implements Serializable, Closeable {
    */
   private transient String latestCheckpointWritten;
 
+  private final boolean autoGenerateRecordKeys;
+
   public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                    TypedProperties props, JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
@@ -281,7 +283,8 @@ public class DeltaSync implements Serializable, Closeable {
     this.props = props;
     this.userProvidedSchemaProvider = schemaProvider;
     this.processedSchema = new SchemaSet();
-    this.keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+    this.autoGenerateRecordKeys = 
KeyGenUtils.enableAutoGenerateRecordKeys(props);
+    this.keyGenClassName = getKeyGeneratorClassName(new 
TypedProperties(props));
     refreshTimeline();
     // Register User Provided schema first
     registerAvroSchemas(schemaProvider);
@@ -362,7 +365,7 @@ public class DeltaSync implements Serializable, Closeable {
   private void initializeEmptyTable() throws IOException {
     this.commitsTimelineOpt = Option.empty();
     this.allCommitsTimelineOpt = Option.empty();
-    String partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
+    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
     HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(cfg.tableType)
         .setTableName(cfg.targetTableName)
@@ -373,8 +376,7 @@ public class DeltaSync implements Serializable, Closeable {
         
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
         
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
             HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
-        
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
-            keyGenerator.getClass().getName()))
+        .setKeyGeneratorClassProp(keyGenClassName)
         .setPreCombineField(cfg.sourceOrderingField)
         
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
             
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
@@ -396,8 +398,9 @@ public class DeltaSync implements Serializable, Closeable {
 
     // Refresh Timeline
     refreshTimeline();
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
 
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(commitsTimelineOpt);
+    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(commitsTimelineOpt, instantTime);
 
     if (srcRecordsWithCkpt != null) {
       final JavaRDD<HoodieRecord> recordsFromSource = 
srcRecordsWithCkpt.getRight().getRight();
@@ -438,7 +441,7 @@ public class DeltaSync implements Serializable, Closeable {
         }
       }
 
-      result = writeToSink(recordsFromSource,
+      result = writeToSink(instantTime, recordsFromSource,
           srcRecordsWithCkpt.getRight().getLeft(), metrics, 
overallTimerContext);
     }
 
@@ -470,14 +473,14 @@ public class DeltaSync implements Serializable, Closeable 
{
    * of schemaProvider, checkpointStr and hoodieRecord
    * @throws Exception in case of any Exception
    */
-  public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
readFromSource(Option<HoodieTimeline> commitsTimelineOpt) throws IOException {
+  public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
readFromSource(Option<HoodieTimeline> commitsTimelineOpt, String instantTime) 
throws IOException {
     // Retrieve the previous round checkpoints, if any
     Option<String> resumeCheckpointStr = Option.empty();
     if (commitsTimelineOpt.isPresent()) {
       resumeCheckpointStr = getCheckpointToResume(commitsTimelineOpt);
     } else {
       // initialize the table for the first time.
-      String partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
+      String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
       HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(cfg.tableType)
           .setTableName(cfg.targetTableName)
@@ -488,8 +491,7 @@ public class DeltaSync implements Serializable, Closeable {
           
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
           
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
               HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
-          
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
-              keyGenerator.getClass().getName()))
+          .setKeyGeneratorClassProp(keyGenClassName)
           
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
               
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
           .setShouldDropPartitionColumns(isDropPartitionColumns())
@@ -507,7 +509,7 @@ public class DeltaSync implements Serializable, Closeable {
     Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> sourceDataToSync 
= null;
     while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
       try {
-        sourceDataToSync = fetchFromSource(resumeCheckpointStr);
+        sourceDataToSync = fetchFromSource(resumeCheckpointStr, instantTime);
       } catch (HoodieSourceTimeoutException e) {
         if (curRetryCount >= maxRetryCount) {
           throw e;
@@ -524,7 +526,7 @@ public class DeltaSync implements Serializable, Closeable {
     return sourceDataToSync;
   }
 
-  private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchFromSource(Option<String> resumeCheckpointStr) {
+  private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchFromSource(Option<String> resumeCheckpointStr, String instantTime) {
     HoodieRecordType recordType = createRecordMerger(props).getRecordType();
     if (recordType == HoodieRecordType.SPARK && 
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
         && 
HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
 "avro"))
@@ -620,25 +622,42 @@ public class DeltaSync implements Serializable, Closeable 
{
     }
 
     boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
-    Set<String> partitionColumns = getPartitionColumns(keyGenerator, props);
+    Set<String> partitionColumns = getPartitionColumns(props);
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
 
     JavaRDD<HoodieRecord> records;
     SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
     SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns() ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
     if (recordType == HoodieRecordType.AVRO) {
-      records = avroRDD.map(record -> {
-        GenericRecord gr = isDropPartitionColumns() ? 
HoodieAvroUtils.removeFields(record, partitionColumns) : record;
-        HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-            (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
-                
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
-            : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
-        return new HoodieAvroRecord<>(keyGenerator.getKey(record), payload);
-      });
+      records = avroRDD.mapPartitions(
+          (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>) 
genericRecordIterator -> {
+            if (autoGenerateRecordKeys) {
+              
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
+              
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
+            }
+            BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+            List<HoodieRecord> avroRecords = new ArrayList<>();
+            while (genericRecordIterator.hasNext()) {
+              GenericRecord genRec = genericRecordIterator.next();
+              HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
+              GenericRecord gr = isDropPartitionColumns() ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
+              HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
+                  (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
+                      
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+                      
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+                  : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
+              avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
+            }
+            return avroRecords.iterator();
+          });
     } else if (recordType == HoodieRecordType.SPARK) {
       // TODO we should remove it if we can read InternalRow from source.
       records = avroRDD.mapPartitions(itr -> {
+        if (autoGenerateRecordKeys) {
+          props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
+          props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime);
+        }
+        BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
         StructType baseStructType = 
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
         StructType targetStructType = isDropPartitionColumns() ? 
AvroConversionUtils
             
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
 partitionColumns)) : baseStructType;
@@ -646,9 +665,8 @@ public class DeltaSync implements Serializable, Closeable {
 
         return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec 
-> {
           InternalRow row = (InternalRow) deserializer.deserialize(rec).get();
-          SparkKeyGeneratorInterface keyGenerator = 
(SparkKeyGeneratorInterface) this.keyGenerator;
-          String recordKey = keyGenerator.getRecordKey(row, 
baseStructType).toString();
-          String partitionPath = keyGenerator.getPartitionPath(row, 
baseStructType).toString();
+          String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
+          String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
           return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath),
               HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false);
         });
@@ -760,13 +778,14 @@ public class DeltaSync implements Serializable, Closeable 
{
   /**
    * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive 
if needed.
    *
+   * @param instantTime         instant time to use for ingest.
    * @param records             Input Records
    * @param checkpointStr       Checkpoint String
    * @param metrics             Metrics
    * @param overallTimerContext Timer Context
    * @return Option Compaction instant if one is scheduled
    */
-  private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
+  private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String 
instantTime, JavaRDD<HoodieRecord> records, String checkpointStr,
                                                                  
HoodieIngestionMetrics metrics,
                                                                  Timer.Context 
overallTimerContext) {
     Option<String> scheduledCompactionInstant = Option.empty();
@@ -776,9 +795,7 @@ public class DeltaSync implements Serializable, Closeable {
     }
 
     boolean isEmpty = records.isEmpty();
-
-    // try to start a new commit
-    String instantTime = startCommit();
+    instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
     LOG.info("Starting commit  : " + instantTime);
 
     JavaRDD<WriteStatus> writeStatusRDD;
@@ -892,18 +909,20 @@ public class DeltaSync implements Serializable, Closeable 
{
    *
    * @return Instant time of the commit
    */
-  private String startCommit() {
+  private String startCommit(String instantTime, boolean retryEnabled) {
     final int maxRetries = 2;
     int retryNum = 1;
     RuntimeException lastException = null;
     while (retryNum <= maxRetries) {
       try {
-        String instantTime = HoodieActiveTimeline.createNewInstantTime();
         String commitActionType = 
CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
         writeClient.startCommitWithTime(instantTime, commitActionType);
         return instantTime;
       } catch (IllegalArgumentException ie) {
         lastException = ie;
+        if (!retryEnabled) {
+          throw ie;
+        }
         LOG.error("Got error trying to start a new commit. Retrying after 
sleeping for a sec", ie);
         retryNum++;
         try {
@@ -912,6 +931,7 @@ public class DeltaSync implements Serializable, Closeable {
           // No-Op
         }
       }
+      instantTime = HoodieActiveTimeline.createNewInstantTime();
     }
     throw lastException;
   }
@@ -974,7 +994,7 @@ public class DeltaSync implements Serializable, Closeable {
   private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, 
JavaRDD<HoodieRecord> records) throws IOException {
     LOG.info("Setting up new Hoodie Write Client");
     if (isDropPartitionColumns()) {
-      targetSchema = HoodieAvroUtils.removeFields(targetSchema, 
getPartitionColumns(keyGenerator, props));
+      targetSchema = HoodieAvroUtils.removeFields(targetSchema, 
getPartitionColumns(props));
     }
     registerAvroSchemas(sourceSchema, targetSchema);
     final HoodieWriteConfig initialWriteConfig = 
getHoodieClientConfig(targetSchema);
@@ -1205,12 +1225,11 @@ public class DeltaSync implements Serializable, 
Closeable {
   /**
    * Get the partition columns as a set of strings.
    *
-   * @param keyGenerator KeyGenerator
    * @param props TypedProperties
    * @return Set of partition columns.
    */
-  private Set<String> getPartitionColumns(KeyGenerator keyGenerator, 
TypedProperties props) {
-    String partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
+  private Set<String> getPartitionColumns(TypedProperties props) {
+    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
     return 
Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d749b15d35d..3f09982ab55 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.AnalysisException;
@@ -601,7 +602,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPropsWithInvalidKeyGenerator() throws Exception {
-    Exception e = assertThrows(IOException.class, () -> {
+    Exception e = assertThrows(SparkException.class, () -> {
       String tableBasePath = basePath + "/test_table_invalid_key_gen";
       HoodieDeltaStreamer deltaStreamer =
           new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
@@ -610,7 +611,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }, "Should error out when setting the key generator class property to an 
invalid value");
     // expected
     LOG.debug("Expected error during getting the key generator", e);
-    assertTrue(e.getMessage().contains("Could not load key generator class"));
+    assertTrue(e.getMessage().contains("Could not load key generator class 
invalid"));
   }
 
   private static Stream<Arguments> provideInferKeyGenArgs() {
@@ -1518,7 +1519,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
    * 1 ===============> HUDI TABLE 2 (incr-pull with transform) (incr-pull) 
Hudi Table 1 is synced with Hive.
    */
   @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
+  @EnumSource(value = HoodieRecordType.class, names = {"AVRO","SPARK"})
   public void 
testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline(HoodieRecordType
 recordType) throws Exception {
     String tableBasePath = basePath + "/test_table2";
     String downstreamTableBasePath = basePath + "/test_downstream_table2";
@@ -2629,10 +2630,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // change cow to mor
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-            .setConf(new Configuration(fs.getConf()))
-            .setBasePath(cfg.targetBasePath)
-            .setLoadActiveTimelineOnLoad(false)
-            .build();
+        .setConf(new Configuration(fs.getConf()))
+        .setBasePath(cfg.targetBasePath)
+        .setLoadActiveTimelineOnLoad(false)
+        .build();
     Properties hoodieProps = new Properties();
     hoodieProps.load(fs.open(new Path(cfg.targetBasePath + 
"/.hoodie/hoodie.properties")));
     LOG.info("old props: {}", hoodieProps);
@@ -2670,6 +2671,31 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  @Test
+  public void testAutoGenerateRecordKeys() throws Exception {
+    boolean useSchemaProvider = false;
+    List<String> transformerClassNames = null;
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 100;
+    boolean hasTransformer = transformerClassNames != null && 
!transformerClassNames.isEmpty();
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", "");
+
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
+    HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+        transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+        useSchemaProvider, 100000, false, null, null, "timestamp", null);
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, 
sqlContext);
+
+    prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, 
null);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(parquetRecordsCount + 200, tableBasePath, 
sqlContext);
+    testNum++;
+  }
+
   class TestDeltaSync extends DeltaSync {
 
     public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession 
sparkSession, SchemaProvider schemaProvider, TypedProperties props,

Reply via email to