This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c7f1a78  [HUDI-728]: Implemented custom key generator (#1433)
c7f1a78 is described below

commit c7f1a781ab4ff3784d53a102364fd85e379811d1
Author: Pratyaksh Sharma <[email protected]>
AuthorDate: Thu Jul 9 17:05:07 2020 +0530

    [HUDI-728]: Implemented custom key generator (#1433)
---
 .../exception/HoodieDeltaStreamerException.java    |   4 +-
 .../apache/hudi/keygen/ComplexKeyGenerator.java    |  56 ++++---
 .../org/apache/hudi/keygen/CustomKeyGenerator.java | 126 +++++++++++++++
 .../hudi/keygen/GlobalDeleteKeyGenerator.java      |  13 +-
 .../hudi/keygen/NonpartitionedKeyGenerator.java    |   6 +-
 .../org/apache/hudi/keygen/SimpleKeyGenerator.java |  23 +--
 .../hudi}/keygen/TimestampBasedKeyGenerator.java   |  25 ++-
 .../hudi/keygen/TestComplexKeyGenerator.java       |  88 +++++++++++
 .../apache/hudi/keygen/TestCustomKeyGenerator.java | 169 +++++++++++++++++++++
 .../hudi/keygen/TestGlobalDeleteKeyGenerator.java  |  78 ++++++++++
 .../hudi/keygen/TestKeyGeneratorUtilities.java}    |  35 ++---
 .../apache/hudi/keygen/TestSimpleKeyGenerator.java |  97 ++++++++++++
 .../keygen/TestTimestampBasedKeyGenerator.java     |   7 +-
 hudi-utilities/pom.xml                             |   8 +
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   2 +-
 15 files changed, 649 insertions(+), 88 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java
 
b/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
similarity index 91%
rename from 
hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java
rename to 
hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
index 0c7165b..e939d62 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java
+++ 
b/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.exception;
-
-import org.apache.hudi.exception.HoodieException;
+package org.apache.hudi.exception;
 
 public class HoodieDeltaStreamerException extends HoodieException {
 
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index 9c31286..b3ab3d0 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -49,21 +49,39 @@ public class ComplexKeyGenerator extends KeyGenerator {
 
   public ComplexKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","))
-            .stream().map(String::trim).collect(Collectors.toList());
+    this.recordKeyFields = 
Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
     this.partitionPathFields =
-        
Arrays.asList(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(","))
-                .stream().map(String::trim).collect(Collectors.toList());
+        
Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
     this.hiveStylePartitioning = 
props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
         
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
   }
 
   @Override
   public HoodieKey getKey(GenericRecord record) {
-    if (recordKeyFields == null || partitionPathFields == null) {
-      throw new HoodieKeyException("Unable to find field names for record key 
or partition path in cfg");
+    String recordKey = getRecordKey(record);
+    StringBuilder partitionPath = new StringBuilder();
+    for (String partitionPathField : partitionPathFields) {
+      partitionPath.append(getPartitionPath(record, partitionPathField));
+      partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+    }
+    partitionPath.deleteCharAt(partitionPath.length() - 1);
+
+    return new HoodieKey(recordKey, partitionPath.toString());
+  }
+
+  String getPartitionPath(GenericRecord record, String partitionPathField) {
+    StringBuilder partitionPath = new StringBuilder();
+    String fieldVal = DataSourceUtils.getNestedFieldValAsString(record, 
partitionPathField, true);
+    if (fieldVal == null || fieldVal.isEmpty()) {
+      partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + 
DEFAULT_PARTITION_PATH
+          : DEFAULT_PARTITION_PATH);
+    } else {
+      partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + 
fieldVal : fieldVal);
     }
+    return partitionPath.toString();
+  }
 
+  String getRecordKey(GenericRecord record) {
     boolean keyIsNullEmpty = true;
     StringBuilder recordKey = new StringBuilder();
     for (String recordKeyField : recordKeyFields) {
@@ -80,30 +98,8 @@ public class ComplexKeyGenerator extends KeyGenerator {
     recordKey.deleteCharAt(recordKey.length() - 1);
     if (keyIsNullEmpty) {
       throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" 
for fields: "
-          + recordKeyFields.toString() + " cannot be entirely null or empty.");
+        + recordKeyFields.toString() + " cannot be entirely null or empty.");
     }
-
-    StringBuilder partitionPath = new StringBuilder();
-    for (String partitionPathField : partitionPathFields) {
-      String fieldVal = DataSourceUtils.getNestedFieldValAsString(record, 
partitionPathField, true);
-      if (fieldVal == null || fieldVal.isEmpty()) {
-        partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" 
+ DEFAULT_PARTITION_PATH
-                : DEFAULT_PARTITION_PATH);
-      } else {
-        partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" 
+ fieldVal : fieldVal);
-      }
-      partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
-    }
-    partitionPath.deleteCharAt(partitionPath.length() - 1);
-
-    return new HoodieKey(recordKey.toString(), partitionPath.toString());
-  }
-
-  public List<String> getRecordKeyFields() {
-    return recordKeyFields;
-  }
-
-  public List<String> getPartitionPathFields() {
-    return partitionPathFields;
+    return recordKey.toString();
   }
 }
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
new file mode 100644
index 0000000..be2d1ef
--- /dev/null
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This is a generic implementation of KeyGenerator where users can configure 
record key as a single field or a combination of fields.
+ * Similarly partition path can be configured to have multiple fields or only 
one field. This class expects value for prop
+ * "hoodie.datasource.write.partitionpath.field" in a specific format. For 
example:
+ *
+ * properties.put("hoodie.datasource.write.partitionpath.field", 
"field1:PartitionKeyType1,field2:PartitionKeyType2").
+ *
+ * The complete partition path is created as <value for field1 basis 
PartitionKeyType1>/<value for field2 basis PartitionKeyType2> and so on.
+ *
+ * Few points to consider:
+ * 1. If you want to customize some partition path field on a timestamp basis, 
you can use field1:timestampBased
+ * 2. If you simply want to have the value of your configured field in the 
partition path, use field1:simple
+ * 3. If you want your table to be non partitioned, simply leave it as blank.
+ *
+ * RecordKey is internally generated using either SimpleKeyGenerator or 
ComplexKeyGenerator.
+ */
+public class CustomKeyGenerator extends KeyGenerator {
+
+  protected final List<String> recordKeyFields;
+  protected final List<String> partitionPathFields;
+  protected final TypedProperties properties;
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+  private static final String SPLIT_REGEX = ":";
+
+  /**
+   * Used as a part of config in CustomKeyGenerator.java.
+   */
+  public enum PartitionKeyType {
+    SIMPLE, TIMESTAMP
+  }
+
+  public CustomKeyGenerator(TypedProperties props) {
+    super(props);
+    this.properties = props;
+    this.recordKeyFields = 
Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
+    this.partitionPathFields =
+      
Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
+  }
+
+  @Override
+  public HoodieKey getKey(GenericRecord record) {
+    //call function to get the record key
+    String recordKey = getRecordKey(record);
+    //call function to get the partition key based on the type for that 
partition path field
+    String partitionPath = getPartitionPath(record);
+    return new HoodieKey(recordKey, partitionPath);
+  }
+
+  private String getPartitionPath(GenericRecord record) {
+    if (partitionPathFields == null) {
+      throw new HoodieKeyException("Unable to find field names for partition 
path in cfg");
+    }
+
+    String partitionPathField;
+    StringBuilder partitionPath = new StringBuilder();
+
+    //Corresponds to no partition case
+    if (partitionPathFields.size() == 1 && 
partitionPathFields.get(0).isEmpty()) {
+      return "";
+    }
+    for (String field : partitionPathFields) {
+      String[] fieldWithType = field.split(SPLIT_REGEX);
+      if (fieldWithType.length != 2) {
+        throw new HoodieKeyException("Unable to find field names for partition 
path in proper format");
+      }
+
+      partitionPathField = fieldWithType[0];
+      PartitionKeyType keyType = 
PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
+      switch (keyType) {
+        case SIMPLE:
+          partitionPath.append(new 
SimpleKeyGenerator(properties).getPartitionPath(record, partitionPathField));
+          break;
+        case TIMESTAMP:
+          partitionPath.append(new 
TimestampBasedKeyGenerator(properties).getPartitionPath(record, 
partitionPathField));
+          break;
+        default:
+          throw new HoodieDeltaStreamerException("Please provide valid 
PartitionKeyType with fields! You provided: " + keyType);
+      }
+
+      partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+    }
+    partitionPath.deleteCharAt(partitionPath.length() - 1);
+
+    return partitionPath.toString();
+  }
+
+  private String getRecordKey(GenericRecord record) {
+    if (recordKeyFields == null || recordKeyFields.isEmpty()) {
+      throw new HoodieKeyException("Unable to find field names for record key 
in cfg");
+    }
+
+    return recordKeyFields.size() == 1 ? new 
SimpleKeyGenerator(properties).getRecordKey(record) : new 
ComplexKeyGenerator(properties).getRecordKey(record);
+  }
+}
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 315c265..37b0529 100644
--- 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Key generator for deletes using global indices. Global index deletes do not 
require partition value
@@ -43,15 +44,15 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator {
 
   public GlobalDeleteKeyGenerator(TypedProperties config) {
     super(config);
-    this.recordKeyFields = 
Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
+    this.recordKeyFields = 
Arrays.stream(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
   }
 
   @Override
   public HoodieKey getKey(GenericRecord record) {
-    if (recordKeyFields == null) {
-      throw new HoodieKeyException("Unable to find field names for record key 
or partition path in cfg");
-    }
+    return new HoodieKey(getRecordKey(record), EMPTY_PARTITION);
+  }
 
+  String getRecordKey(GenericRecord record) {
     boolean keyIsNullEmpty = true;
     StringBuilder recordKey = new StringBuilder();
     for (String recordKeyField : recordKeyFields) {
@@ -68,9 +69,9 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator {
     recordKey.deleteCharAt(recordKey.length() - 1);
     if (keyIsNullEmpty) {
       throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" 
for fields: "
-          + recordKeyFields.toString() + " cannot be entirely null or empty.");
+        + recordKeyFields.toString() + " cannot be entirely null or empty.");
     }
 
-    return new HoodieKey(recordKey.toString(), EMPTY_PARTITION);
+    return recordKey.toString();
   }
 }
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index f5b32a0..e790b46 100644
--- 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
@@ -28,12 +29,15 @@ import org.apache.avro.generic.GenericRecord;
 /**
  * Simple Key generator for unpartitioned Hive Tables.
  */
-public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
+public class NonpartitionedKeyGenerator extends KeyGenerator {
 
   private static final String EMPTY_PARTITION = "";
 
+  protected final String recordKeyField;
+
   public NonpartitionedKeyGenerator(TypedProperties props) {
     super(props);
+    this.recordKeyField = 
props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
   }
 
   @Override
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index dde321d..e9b9396 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -49,15 +49,12 @@ public class SimpleKeyGenerator extends KeyGenerator {
 
   @Override
   public HoodieKey getKey(GenericRecord record) {
-    if (recordKeyField == null || partitionPathField == null) {
-      throw new HoodieKeyException("Unable to find field names for record key 
or partition path in cfg");
-    }
-
-    String recordKey = DataSourceUtils.getNestedFieldValAsString(record, 
recordKeyField, true);
-    if (recordKey == null || recordKey.isEmpty()) {
-      throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for 
field: \"" + recordKeyField + "\" cannot be null or empty.");
-    }
+    String recordKey = getRecordKey(record);
+    String partitionPath = getPartitionPath(record, partitionPathField);
+    return new HoodieKey(recordKey, partitionPath);
+  }
 
+  String getPartitionPath(GenericRecord record, String partitionPathField) {
     String partitionPath = DataSourceUtils.getNestedFieldValAsString(record, 
partitionPathField, true);
     if (partitionPath == null || partitionPath.isEmpty()) {
       partitionPath = DEFAULT_PARTITION_PATH;
@@ -66,6 +63,14 @@ public class SimpleKeyGenerator extends KeyGenerator {
       partitionPath = partitionPathField + "=" + partitionPath;
     }
 
-    return new HoodieKey(recordKey, partitionPath);
+    return partitionPath;
+  }
+
+  String getRecordKey(GenericRecord record) {
+    String recordKey = DataSourceUtils.getNestedFieldValAsString(record, 
recordKeyField, true);
+    if (recordKey == null || recordKey.isEmpty()) {
+      throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for 
field: \"" + recordKeyField + "\" cannot be null or empty.");
+    }
+    return recordKey;
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
similarity index 87%
rename from 
hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
rename to 
hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index e5bdc64..c088513 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
+++ 
b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -16,15 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.keygen;
+package org.apache.hudi.keygen;
 
 import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.keygen.SimpleKeyGenerator;
-import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.exception.HoodieDeltaStreamerException;
 
 import org.apache.avro.generic.GenericRecord;
 
@@ -75,7 +73,7 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
     private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP =
         "hoodie.deltastreamer.keygen.timebased.output.dateformat";
     private static final String TIMESTAMP_TIMEZONE_FORMAT_PROP =
-            "hoodie.deltastreamer.keygen.timebased.timezone";
+        "hoodie.deltastreamer.keygen.timebased.timezone";
   }
 
   public TimestampBasedKeyGenerator(TypedProperties config) {
@@ -111,6 +109,12 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
 
   @Override
   public HoodieKey getKey(GenericRecord record) {
+    String recordKey = getRecordKey(record);
+    String partitionPath = getPartitionPath(record, partitionPathField);
+    return new HoodieKey(recordKey, partitionPath);
+  }
+
+  String getPartitionPath(GenericRecord record, String partitionPathField) {
     Object partitionVal = DataSourceUtils.getNestedFieldVal(record, 
partitionPathField, true);
     if (partitionVal == null) {
       partitionVal = 1L;
@@ -133,14 +137,9 @@ public class TimestampBasedKeyGenerator extends 
SimpleKeyGenerator {
           "Unexpected type for partition field: " + 
partitionVal.getClass().getName());
       }
       Date timestamp = new Date(timeMs);
-      String recordKey = DataSourceUtils.getNestedFieldValAsString(record, 
recordKeyField, true);
-      if (recordKey == null || recordKey.isEmpty()) {
-        throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" 
for field: \"" + recordKeyField + "\" cannot be null or empty.");
-      }
 
-      String partitionPath = hiveStylePartitioning ? partitionPathField + "=" 
+ partitionPathFormat.format(timestamp)
-              : partitionPathFormat.format(timestamp);
-      return new HoodieKey(recordKey, partitionPath);
+      return hiveStylePartitioning ? partitionPathField + "=" + 
partitionPathFormat.format(timestamp)
+        : partitionPathFormat.format(timestamp);
     } catch (ParseException pe) {
       throw new HoodieDeltaStreamerException("Unable to parse input partition 
field :" + partitionVal, pe);
     }
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
new file mode 100644
index 0000000..bb94c25
--- /dev/null
+++ 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestComplexKeyGenerator extends TestKeyGeneratorUtilities {
+
+  private TypedProperties getCommonProps(boolean getComplexRecordKey) {
+    TypedProperties properties = new TypedProperties();
+    if (getComplexRecordKey) {
+      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key, pii_col");
+    } else {
+      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key");
+    }
+    properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), 
"true");
+    return properties;
+  }
+
+  private TypedProperties getPropertiesWithoutPartitionPathProp() {
+    return getCommonProps(false);
+  }
+
+  private TypedProperties getPropertiesWithoutRecordKeyProp() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp");
+    return properties;
+  }
+
+  private TypedProperties getWrongRecordKeyFieldProps() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp");
+    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_wrong_key");
+    return properties;
+  }
+
+  private TypedProperties getProps() {
+    TypedProperties properties = getCommonProps(true);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp,ts_ms");
+    return properties;
+  }
+
+  @Test
+  public void testNullPartitionPathFields() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
+  }
+
+  @Test
+  public void testNullRecordKeyFields() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+  }
+
+  @Test
+  public void testWrongRecordKeyField() {
+    ComplexKeyGenerator keyGenerator = new 
ComplexKeyGenerator(getWrongRecordKeyFieldProps());
+    Assertions.assertThrows(HoodieKeyException.class, () -> 
keyGenerator.getRecordKey(getRecord()));
+  }
+
+  @Test
+  public void testHappyFlow() {
+    ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps());
+    HoodieKey key = keyGenerator.getKey(getRecord());
+    Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+    Assertions.assertEquals(key.getPartitionPath(), 
"timestamp=4357686/ts_ms=2020-03-21");
+  }
+}
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
new file mode 100644
index 0000000..699bf43
--- /dev/null
+++ 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestCustomKeyGenerator extends TestKeyGeneratorUtilities {
+
+  private TypedProperties getCommonProps(boolean getComplexRecordKey) {
+    TypedProperties properties = new TypedProperties();
+    if (getComplexRecordKey) {
+      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key, pii_col");
+    } else {
+      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key");
+    }
+    properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), 
"true");
+    return properties;
+  }
+
+  private TypedProperties getPropertiesForSimpleKeyGen() {
+    TypedProperties properties = getCommonProps(false);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp:simple");
+    return properties;
+  }
+
+  private TypedProperties getImproperPartitionFieldFormatProp() {
+    TypedProperties properties = getCommonProps(false);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp");
+    return properties;
+  }
+
+  private TypedProperties getInvalidPartitionKeyTypeProps() {
+    TypedProperties properties = getCommonProps(false);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp:dummy");
+    return properties;
+  }
+
+  private TypedProperties getComplexRecordKeyWithSimplePartitionProps() {
+    TypedProperties properties = getCommonProps(true);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp:simple");
+    return properties;
+  }
+
+  private TypedProperties getComplexRecordKeyAndPartitionPathProps() {
+    TypedProperties properties = getCommonProps(true);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp:simple,ts_ms:timestamp");
+    populateNecessaryPropsForTimestampBasedKeyGen(properties);
+    return properties;
+  }
+
+  private TypedProperties getPropsWithoutRecordKeyFieldProps() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp:simple");
+    return properties;
+  }
+
+  private void populateNecessaryPropsForTimestampBasedKeyGen(TypedProperties 
properties) {
+    properties.put("hoodie.deltastreamer.keygen.timebased.timestamp.type", 
"DATE_STRING");
+    properties.put("hoodie.deltastreamer.keygen.timebased.input.dateformat", 
"yyyy-MM-dd");
+    properties.put("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
+  }
+
+  private TypedProperties getPropertiesForTimestampBasedKeyGen() {
+    TypedProperties properties = getCommonProps(false);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"ts_ms:timestamp");
+    populateNecessaryPropsForTimestampBasedKeyGen(properties);
+    return properties;
+  }
+
+  private TypedProperties getPropertiesForNonPartitionedKeyGen() {
+    TypedProperties properties = getCommonProps(false);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
+    return properties;
+  }
+
+  @Test
+  public void testSimpleKeyGenerator() {
+    KeyGenerator keyGenerator = new 
CustomKeyGenerator(getPropertiesForSimpleKeyGen());
+    HoodieKey key = keyGenerator.getKey(getRecord());
+    Assertions.assertEquals(key.getRecordKey(), "key1");
+    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+  }
+
+  @Test
+  public void testTimestampBasedKeyGenerator() {
+    KeyGenerator keyGenerator = new 
CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen());
+    HoodieKey key = keyGenerator.getKey(getRecord());
+    Assertions.assertEquals(key.getRecordKey(), "key1");
+    Assertions.assertEquals(key.getPartitionPath(), "ts_ms=20200321");
+  }
+
+  @Test
+  public void testNonPartitionedKeyGenerator() {
+    KeyGenerator keyGenerator = new 
CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen());
+    HoodieKey key = keyGenerator.getKey(getRecord());
+    Assertions.assertEquals(key.getRecordKey(), "key1");
+    Assertions.assertTrue(key.getPartitionPath().isEmpty());
+  }
+
+  @Test
+  public void testInvalidPartitionKeyType() {
+    try {
+      KeyGenerator keyGenerator = new 
CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
+      keyGenerator.getKey(getRecord());
+      Assertions.fail("should fail when invalid PartitionKeyType is 
provided!");
+    } catch (Exception e) {
+      Assertions.assertTrue(e.getMessage().contains("No enum constant 
org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY"));
+    }
+  }
+
+  @Test
+  public void testNoRecordKeyFieldProp() {
+    try {
+      KeyGenerator keyGenerator = new 
CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
+      keyGenerator.getKey(getRecord());
+      Assertions.fail("should fail when record key field is not provided!");
+    } catch (Exception e) {
+      Assertions.assertTrue(e.getMessage().contains("Property 
hoodie.datasource.write.recordkey.field not found"));
+    }
+  }
+
+  @Test
+  public void testPartitionFieldsInImproperFormat() {
+    try {
+      KeyGenerator keyGenerator = new 
CustomKeyGenerator(getImproperPartitionFieldFormatProp());
+      keyGenerator.getKey(getRecord());
+      Assertions.fail("should fail when partition key field is provided in 
improper format!");
+    } catch (Exception e) {
+      Assertions.assertTrue(e.getMessage().contains("Unable to find field 
names for partition path in proper format"));
+    }
+  }
+
+  @Test
+  public void testComplexRecordKeyWithSimplePartitionPath() {
+    KeyGenerator keyGenerator = new 
CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps());
+    HoodieKey key = keyGenerator.getKey(getRecord());
+    Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+  }
+
+  @Test
+  public void testComplexRecordKeysWithComplexPartitionPath() {
+    KeyGenerator keyGenerator = new 
CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps());
+    HoodieKey key = keyGenerator.getKey(getRecord());
+    Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+    Assertions.assertEquals(key.getPartitionPath(), 
"timestamp=4357686/ts_ms=20200321");
+  }
+}
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
new file mode 100644
index 0000000..e46c783
--- /dev/null
+++ 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGlobalDeleteKeyGenerator extends TestKeyGeneratorUtilities {
+
+  private TypedProperties getCommonProps(boolean getComplexRecordKey) {
+    TypedProperties properties = new TypedProperties();
+    if (getComplexRecordKey) {
+      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key,pii_col");
+    } else {
+      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key");
+    }
+    properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), 
"true");
+    return properties;
+  }
+
+  private TypedProperties getPropertiesWithoutRecordKeyProp() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp");
+    return properties;
+  }
+
+  private TypedProperties getWrongRecordKeyFieldProps() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_wrong_key");
+    return properties;
+  }
+
+  private TypedProperties getProps() {
+    TypedProperties properties = getCommonProps(true);
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp,ts_ms");
+    return properties;
+  }
+
+  @Test
+  public void testNullRecordKeyFields() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
GlobalDeleteKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+  }
+
+  @Test
+  public void testWrongRecordKeyField() {
+    GlobalDeleteKeyGenerator keyGenerator = new 
GlobalDeleteKeyGenerator(getWrongRecordKeyFieldProps());
+    Assertions.assertThrows(HoodieKeyException.class, () -> 
keyGenerator.getRecordKey(getRecord()));
+  }
+
+  @Test
+  public void testHappyFlow() {
+    GlobalDeleteKeyGenerator keyGenerator = new 
GlobalDeleteKeyGenerator(getProps());
+    HoodieKey key = keyGenerator.getKey(getRecord());
+    Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+    Assertions.assertEquals(key.getPartitionPath(), "");
+  }
+}
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java
similarity index 50%
copy from 
hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
copy to 
hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java
index f5b32a0..c0d027e 100644
--- 
a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java
@@ -18,30 +18,23 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
-
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 
-/**
- * Simple Key generator for unpartitioned Hive Tables.
- */
-public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
-
-  private static final String EMPTY_PARTITION = "";
+public class TestKeyGeneratorUtilities {
 
-  public NonpartitionedKeyGenerator(TypedProperties props) {
-    super(props);
-  }
+  public String exampleSchema = "{\"type\": \"record\",\"name\": 
\"testrec\",\"fields\": [ "
+      + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", 
\"type\": \"string\"},"
+      + "{\"name\": \"ts_ms\", \"type\": \"string\"},"
+      + "{\"name\": \"pii_col\", \"type\": \"string\"}]}";
 
-  @Override
-  public HoodieKey getKey(GenericRecord record) {
-    String recordKey = DataSourceUtils.getNestedFieldValAsString(record, 
recordKeyField, true);
-    if (recordKey == null || recordKey.isEmpty()) {
-      throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for 
field: \"" + recordKeyField + "\" cannot be null or empty.");
-    }
-    return new HoodieKey(recordKey, EMPTY_PARTITION);
+  public GenericRecord getRecord() {
+    GenericRecord record = new GenericData.Record(new 
Schema.Parser().parse(exampleSchema));
+    record.put("timestamp", 4357686);
+    record.put("_row_key", "key1");
+    record.put("ts_ms", "2020-03-21");
+    record.put("pii_col", "pi");
+    return record;
   }
 }
diff --git 
a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
new file mode 100644
index 0000000..f36331a
--- /dev/null
+++ 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestSimpleKeyGenerator extends TestKeyGeneratorUtilities {
+
+  private TypedProperties getCommonProps() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key");
+    properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), 
"true");
+    return properties;
+  }
+
+  private TypedProperties getPropertiesWithoutPartitionPathProp() {
+    return getCommonProps();
+  }
+
+  private TypedProperties getPropertiesWithoutRecordKeyProp() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp");
+    return properties;
+  }
+
+  private TypedProperties getWrongRecordKeyFieldProps() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp");
+    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_wrong_key");
+    return properties;
+  }
+
+  private TypedProperties getComplexRecordKeyProp() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp");
+    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key,pii_col");
+    return properties;
+  }
+
+  private TypedProperties getProps() {
+    TypedProperties properties = getCommonProps();
+    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"timestamp");
+    return properties;
+  }
+
+  @Test
+  public void testNullPartitionPathFields() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
SimpleKeyGenerator(getPropertiesWithoutPartitionPathProp()));
+  }
+
+  @Test
+  public void testNullRecordKeyFields() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> new 
SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+  }
+
+  @Test
+  public void testWrongRecordKeyField() {
+    SimpleKeyGenerator keyGenerator = new 
SimpleKeyGenerator(getWrongRecordKeyFieldProps());
+    Assertions.assertThrows(HoodieKeyException.class, () -> 
keyGenerator.getRecordKey(getRecord()));
+  }
+
+  @Test
+  public void testComplexRecordKeyField() {
+    SimpleKeyGenerator keyGenerator = new 
SimpleKeyGenerator(getComplexRecordKeyProp());
+    Assertions.assertThrows(HoodieKeyException.class, () -> 
keyGenerator.getRecordKey(getRecord()));
+  }
+
+  @Test
+  public void testHappyFlow() {
+    SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getProps());
+    HoodieKey key = keyGenerator.getKey(getRecord());
+    Assertions.assertEquals(key.getRecordKey(), "key1");
+    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
similarity index 97%
rename from 
hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
rename to 
hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
index 81f7751..bd8583f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
+++ 
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.keygen;
+package org.apache.hudi.keygen;
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
@@ -40,13 +40,13 @@ public class TestTimestampBasedKeyGenerator {
   public void initialize() throws IOException {
     Schema schema = SchemaTestUtil.getTimestampEvolvedSchema();
     baseRecord = SchemaTestUtil
-        .generateAvroRecordFromJson(schema, 1, "001", "f1");
+      .generateAvroRecordFromJson(schema, 1, "001", "f1");
 
     properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"field1");
     
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"createTime");
     
properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
 "false");
   }
-  
+
   private TypedProperties getBaseKeyConfig(String timestampType, String 
dateFormat, String timezone, String scalarType) {
     
properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type", 
timestampType);
     
properties.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
 dateFormat);
@@ -55,7 +55,6 @@ public class TestTimestampBasedKeyGenerator {
     if (scalarType != null) {
       
properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit",
 scalarType);
     }
-
     return properties;
   }
 
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 7cb78a1..e320d67 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -352,6 +352,14 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
 
     <!-- Hive - Test -->
     <dependency>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 0d3e90c..41efc50 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -39,7 +39,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.utilities.UtilHelpers;
-import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
 import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;

Reply via email to