This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c7f1a78 [HUDI-728]: Implemented custom key generator (#1433)
c7f1a78 is described below
commit c7f1a781ab4ff3784d53a102364fd85e379811d1
Author: Pratyaksh Sharma <[email protected]>
AuthorDate: Thu Jul 9 17:05:07 2020 +0530
[HUDI-728]: Implemented custom key generator (#1433)
---
.../exception/HoodieDeltaStreamerException.java | 4 +-
.../apache/hudi/keygen/ComplexKeyGenerator.java | 56 ++++---
.../org/apache/hudi/keygen/CustomKeyGenerator.java | 126 +++++++++++++++
.../hudi/keygen/GlobalDeleteKeyGenerator.java | 13 +-
.../hudi/keygen/NonpartitionedKeyGenerator.java | 6 +-
.../org/apache/hudi/keygen/SimpleKeyGenerator.java | 23 +--
.../hudi}/keygen/TimestampBasedKeyGenerator.java | 25 ++-
.../hudi/keygen/TestComplexKeyGenerator.java | 88 +++++++++++
.../apache/hudi/keygen/TestCustomKeyGenerator.java | 169 +++++++++++++++++++++
.../hudi/keygen/TestGlobalDeleteKeyGenerator.java | 78 ++++++++++
.../hudi/keygen/TestKeyGeneratorUtilities.java} | 35 ++---
.../apache/hudi/keygen/TestSimpleKeyGenerator.java | 97 ++++++++++++
.../keygen/TestTimestampBasedKeyGenerator.java | 7 +-
hudi-utilities/pom.xml | 8 +
.../hudi/utilities/deltastreamer/DeltaSync.java | 2 +-
15 files changed, 649 insertions(+), 88 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java
b/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
similarity index 91%
rename from
hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java
rename to
hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
index 0c7165b..e939d62 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieDeltaStreamerException.java
+++
b/hudi-spark/src/main/java/org/apache/hudi/exception/HoodieDeltaStreamerException.java
@@ -16,9 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.exception;
-
-import org.apache.hudi.exception.HoodieException;
+package org.apache.hudi.exception;
public class HoodieDeltaStreamerException extends HoodieException {
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index 9c31286..b3ab3d0 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -49,21 +49,39 @@ public class ComplexKeyGenerator extends KeyGenerator {
public ComplexKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields =
Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","))
- .stream().map(String::trim).collect(Collectors.toList());
+ this.recordKeyFields =
Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
this.partitionPathFields =
-
Arrays.asList(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(","))
- .stream().map(String::trim).collect(Collectors.toList());
+
Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
this.hiveStylePartitioning =
props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
}
@Override
public HoodieKey getKey(GenericRecord record) {
- if (recordKeyFields == null || partitionPathFields == null) {
- throw new HoodieKeyException("Unable to find field names for record key
or partition path in cfg");
+ String recordKey = getRecordKey(record);
+ StringBuilder partitionPath = new StringBuilder();
+ for (String partitionPathField : partitionPathFields) {
+ partitionPath.append(getPartitionPath(record, partitionPathField));
+ partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+ }
+ partitionPath.deleteCharAt(partitionPath.length() - 1);
+
+ return new HoodieKey(recordKey, partitionPath.toString());
+ }
+
+ String getPartitionPath(GenericRecord record, String partitionPathField) {
+ StringBuilder partitionPath = new StringBuilder();
+ String fieldVal = DataSourceUtils.getNestedFieldValAsString(record,
partitionPathField, true);
+ if (fieldVal == null || fieldVal.isEmpty()) {
+ partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" +
DEFAULT_PARTITION_PATH
+ : DEFAULT_PARTITION_PATH);
+ } else {
+ partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" +
fieldVal : fieldVal);
}
+ return partitionPath.toString();
+ }
+ String getRecordKey(GenericRecord record) {
boolean keyIsNullEmpty = true;
StringBuilder recordKey = new StringBuilder();
for (String recordKeyField : recordKeyFields) {
@@ -80,30 +98,8 @@ public class ComplexKeyGenerator extends KeyGenerator {
recordKey.deleteCharAt(recordKey.length() - 1);
if (keyIsNullEmpty) {
throw new HoodieKeyException("recordKey values: \"" + recordKey + "\"
for fields: "
- + recordKeyFields.toString() + " cannot be entirely null or empty.");
+ + recordKeyFields.toString() + " cannot be entirely null or empty.");
}
-
- StringBuilder partitionPath = new StringBuilder();
- for (String partitionPathField : partitionPathFields) {
- String fieldVal = DataSourceUtils.getNestedFieldValAsString(record,
partitionPathField, true);
- if (fieldVal == null || fieldVal.isEmpty()) {
- partitionPath.append(hiveStylePartitioning ? partitionPathField + "="
+ DEFAULT_PARTITION_PATH
- : DEFAULT_PARTITION_PATH);
- } else {
- partitionPath.append(hiveStylePartitioning ? partitionPathField + "="
+ fieldVal : fieldVal);
- }
- partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
- }
- partitionPath.deleteCharAt(partitionPath.length() - 1);
-
- return new HoodieKey(recordKey.toString(), partitionPath.toString());
- }
-
- public List<String> getRecordKeyFields() {
- return recordKeyFields;
- }
-
- public List<String> getPartitionPathFields() {
- return partitionPathFields;
+ return recordKey.toString();
}
}
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
b/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
new file mode 100644
index 0000000..be2d1ef
--- /dev/null
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This is a generic implementation of KeyGenerator where users can configure
record key as a single field or a combination of fields.
+ * Similarly partition path can be configured to have multiple fields or only
one field. This class expects value for prop
+ * "hoodie.datasource.write.partitionpath.field" in a specific format. For
example:
+ *
+ * properties.put("hoodie.datasource.write.partitionpath.field",
"field1:PartitionKeyType1,field2:PartitionKeyType2").
+ *
+ * The complete partition path is created as <value for field1 basis
PartitionKeyType1>/<value for field2 basis PartitionKeyType2> and so on.
+ *
+ * Few points to consider:
+ * 1. If you want to customize some partition path field on a timestamp basis,
you can use field1:timestampBased
+ * 2. If you simply want to have the value of your configured field in the
partition path, use field1:simple
+ * 3. If you want your table to be non partitioned, simply leave it as blank.
+ *
+ * RecordKey is internally generated using either SimpleKeyGenerator or
ComplexKeyGenerator.
+ */
+public class CustomKeyGenerator extends KeyGenerator {
+
+ protected final List<String> recordKeyFields;
+ protected final List<String> partitionPathFields;
+ protected final TypedProperties properties;
+ private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+ private static final String SPLIT_REGEX = ":";
+
+ /**
+ * Used as a part of config in CustomKeyGenerator.java.
+ */
+ public enum PartitionKeyType {
+ SIMPLE, TIMESTAMP
+ }
+
+ public CustomKeyGenerator(TypedProperties props) {
+ super(props);
+ this.properties = props;
+ this.recordKeyFields =
Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
+ this.partitionPathFields =
+
Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
+ }
+
+ @Override
+ public HoodieKey getKey(GenericRecord record) {
+ //call function to get the record key
+ String recordKey = getRecordKey(record);
+ //call function to get the partition key based on the type for that
partition path field
+ String partitionPath = getPartitionPath(record);
+ return new HoodieKey(recordKey, partitionPath);
+ }
+
+ private String getPartitionPath(GenericRecord record) {
+ if (partitionPathFields == null) {
+ throw new HoodieKeyException("Unable to find field names for partition
path in cfg");
+ }
+
+ String partitionPathField;
+ StringBuilder partitionPath = new StringBuilder();
+
+ //Corresponds to no partition case
+ if (partitionPathFields.size() == 1 &&
partitionPathFields.get(0).isEmpty()) {
+ return "";
+ }
+ for (String field : partitionPathFields) {
+ String[] fieldWithType = field.split(SPLIT_REGEX);
+ if (fieldWithType.length != 2) {
+ throw new HoodieKeyException("Unable to find field names for partition
path in proper format");
+ }
+
+ partitionPathField = fieldWithType[0];
+ PartitionKeyType keyType =
PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
+ switch (keyType) {
+ case SIMPLE:
+ partitionPath.append(new
SimpleKeyGenerator(properties).getPartitionPath(record, partitionPathField));
+ break;
+ case TIMESTAMP:
+ partitionPath.append(new
TimestampBasedKeyGenerator(properties).getPartitionPath(record,
partitionPathField));
+ break;
+ default:
+ throw new HoodieDeltaStreamerException("Please provide valid
PartitionKeyType with fields! You provided: " + keyType);
+ }
+
+ partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+ }
+ partitionPath.deleteCharAt(partitionPath.length() - 1);
+
+ return partitionPath.toString();
+ }
+
+ private String getRecordKey(GenericRecord record) {
+ if (recordKeyFields == null || recordKeyFields.isEmpty()) {
+ throw new HoodieKeyException("Unable to find field names for record key
in cfg");
+ }
+
+ return recordKeyFields.size() == 1 ? new
SimpleKeyGenerator(properties).getRecordKey(record) : new
ComplexKeyGenerator(properties).getRecordKey(record);
+ }
+}
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 315c265..37b0529 100644
---
a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++
b/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericRecord;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Key generator for deletes using global indices. Global index deletes do not
require partition value
@@ -43,15 +44,15 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator {
public GlobalDeleteKeyGenerator(TypedProperties config) {
super(config);
- this.recordKeyFields =
Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
+ this.recordKeyFields =
Arrays.stream(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
}
@Override
public HoodieKey getKey(GenericRecord record) {
- if (recordKeyFields == null) {
- throw new HoodieKeyException("Unable to find field names for record key
or partition path in cfg");
- }
+ return new HoodieKey(getRecordKey(record), EMPTY_PARTITION);
+ }
+ String getRecordKey(GenericRecord record) {
boolean keyIsNullEmpty = true;
StringBuilder recordKey = new StringBuilder();
for (String recordKeyField : recordKeyFields) {
@@ -68,9 +69,9 @@ public class GlobalDeleteKeyGenerator extends KeyGenerator {
recordKey.deleteCharAt(recordKey.length() - 1);
if (keyIsNullEmpty) {
throw new HoodieKeyException("recordKey values: \"" + recordKey + "\"
for fields: "
- + recordKeyFields.toString() + " cannot be entirely null or empty.");
+ + recordKeyFields.toString() + " cannot be entirely null or empty.");
}
- return new HoodieKey(recordKey.toString(), EMPTY_PARTITION);
+ return recordKey.toString();
}
}
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index f5b32a0..e790b46 100644
---
a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++
b/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -18,6 +18,7 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
@@ -28,12 +29,15 @@ import org.apache.avro.generic.GenericRecord;
/**
* Simple Key generator for unpartitioned Hive Tables.
*/
-public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
+public class NonpartitionedKeyGenerator extends KeyGenerator {
private static final String EMPTY_PARTITION = "";
+ protected final String recordKeyField;
+
public NonpartitionedKeyGenerator(TypedProperties props) {
super(props);
+ this.recordKeyField =
props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
}
@Override
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index dde321d..e9b9396 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -49,15 +49,12 @@ public class SimpleKeyGenerator extends KeyGenerator {
@Override
public HoodieKey getKey(GenericRecord record) {
- if (recordKeyField == null || partitionPathField == null) {
- throw new HoodieKeyException("Unable to find field names for record key
or partition path in cfg");
- }
-
- String recordKey = DataSourceUtils.getNestedFieldValAsString(record,
recordKeyField, true);
- if (recordKey == null || recordKey.isEmpty()) {
- throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for
field: \"" + recordKeyField + "\" cannot be null or empty.");
- }
+ String recordKey = getRecordKey(record);
+ String partitionPath = getPartitionPath(record, partitionPathField);
+ return new HoodieKey(recordKey, partitionPath);
+ }
+ String getPartitionPath(GenericRecord record, String partitionPathField) {
String partitionPath = DataSourceUtils.getNestedFieldValAsString(record,
partitionPathField, true);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = DEFAULT_PARTITION_PATH;
@@ -66,6 +63,14 @@ public class SimpleKeyGenerator extends KeyGenerator {
partitionPath = partitionPathField + "=" + partitionPath;
}
- return new HoodieKey(recordKey, partitionPath);
+ return partitionPath;
+ }
+
+ String getRecordKey(GenericRecord record) {
+ String recordKey = DataSourceUtils.getNestedFieldValAsString(record,
recordKeyField, true);
+ if (recordKey == null || recordKey.isEmpty()) {
+ throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for
field: \"" + recordKeyField + "\" cannot be null or empty.");
+ }
+ return recordKey;
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
similarity index 87%
rename from
hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
rename to
hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index e5bdc64..c088513 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
+++
b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -16,15 +16,13 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.keygen;
+package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.keygen.SimpleKeyGenerator;
-import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.avro.generic.GenericRecord;
@@ -75,7 +73,7 @@ public class TimestampBasedKeyGenerator extends
SimpleKeyGenerator {
private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP =
"hoodie.deltastreamer.keygen.timebased.output.dateformat";
private static final String TIMESTAMP_TIMEZONE_FORMAT_PROP =
- "hoodie.deltastreamer.keygen.timebased.timezone";
+ "hoodie.deltastreamer.keygen.timebased.timezone";
}
public TimestampBasedKeyGenerator(TypedProperties config) {
@@ -111,6 +109,12 @@ public class TimestampBasedKeyGenerator extends
SimpleKeyGenerator {
@Override
public HoodieKey getKey(GenericRecord record) {
+ String recordKey = getRecordKey(record);
+ String partitionPath = getPartitionPath(record, partitionPathField);
+ return new HoodieKey(recordKey, partitionPath);
+ }
+
+ String getPartitionPath(GenericRecord record, String partitionPathField) {
Object partitionVal = DataSourceUtils.getNestedFieldVal(record,
partitionPathField, true);
if (partitionVal == null) {
partitionVal = 1L;
@@ -133,14 +137,9 @@ public class TimestampBasedKeyGenerator extends
SimpleKeyGenerator {
"Unexpected type for partition field: " +
partitionVal.getClass().getName());
}
Date timestamp = new Date(timeMs);
- String recordKey = DataSourceUtils.getNestedFieldValAsString(record,
recordKeyField, true);
- if (recordKey == null || recordKey.isEmpty()) {
- throw new HoodieKeyException("recordKey value: \"" + recordKey + "\"
for field: \"" + recordKeyField + "\" cannot be null or empty.");
- }
- String partitionPath = hiveStylePartitioning ? partitionPathField + "="
+ partitionPathFormat.format(timestamp)
- : partitionPathFormat.format(timestamp);
- return new HoodieKey(recordKey, partitionPath);
+ return hiveStylePartitioning ? partitionPathField + "=" +
partitionPathFormat.format(timestamp)
+ : partitionPathFormat.format(timestamp);
} catch (ParseException pe) {
throw new HoodieDeltaStreamerException("Unable to parse input partition
field :" + partitionVal, pe);
}
diff --git
a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
new file mode 100644
index 0000000..bb94c25
--- /dev/null
+++
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestComplexKeyGenerator extends TestKeyGeneratorUtilities {
+
+ private TypedProperties getCommonProps(boolean getComplexRecordKey) {
+ TypedProperties properties = new TypedProperties();
+ if (getComplexRecordKey) {
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key, pii_col");
+ } else {
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key");
+ }
+ properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
"true");
+ return properties;
+ }
+
+ private TypedProperties getPropertiesWithoutPartitionPathProp() {
+ return getCommonProps(false);
+ }
+
+ private TypedProperties getPropertiesWithoutRecordKeyProp() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp");
+ return properties;
+ }
+
+ private TypedProperties getWrongRecordKeyFieldProps() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp");
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_wrong_key");
+ return properties;
+ }
+
+ private TypedProperties getProps() {
+ TypedProperties properties = getCommonProps(true);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp,ts_ms");
+ return properties;
+ }
+
+ @Test
+ public void testNullPartitionPathFields() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new
ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
+ }
+
+ @Test
+ public void testNullRecordKeyFields() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new
ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ }
+
+ @Test
+ public void testWrongRecordKeyField() {
+ ComplexKeyGenerator keyGenerator = new
ComplexKeyGenerator(getWrongRecordKeyFieldProps());
+ Assertions.assertThrows(HoodieKeyException.class, () ->
keyGenerator.getRecordKey(getRecord()));
+ }
+
+ @Test
+ public void testHappyFlow() {
+ ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps());
+ HoodieKey key = keyGenerator.getKey(getRecord());
+ Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+ Assertions.assertEquals(key.getPartitionPath(),
"timestamp=4357686/ts_ms=2020-03-21");
+ }
+}
diff --git
a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
new file mode 100644
index 0000000..699bf43
--- /dev/null
+++
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestCustomKeyGenerator extends TestKeyGeneratorUtilities {
+
+ private TypedProperties getCommonProps(boolean getComplexRecordKey) {
+ TypedProperties properties = new TypedProperties();
+ if (getComplexRecordKey) {
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key, pii_col");
+ } else {
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key");
+ }
+ properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
"true");
+ return properties;
+ }
+
+ private TypedProperties getPropertiesForSimpleKeyGen() {
+ TypedProperties properties = getCommonProps(false);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp:simple");
+ return properties;
+ }
+
+ private TypedProperties getImproperPartitionFieldFormatProp() {
+ TypedProperties properties = getCommonProps(false);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp");
+ return properties;
+ }
+
+ private TypedProperties getInvalidPartitionKeyTypeProps() {
+ TypedProperties properties = getCommonProps(false);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp:dummy");
+ return properties;
+ }
+
+ private TypedProperties getComplexRecordKeyWithSimplePartitionProps() {
+ TypedProperties properties = getCommonProps(true);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp:simple");
+ return properties;
+ }
+
+ private TypedProperties getComplexRecordKeyAndPartitionPathProps() {
+ TypedProperties properties = getCommonProps(true);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp:simple,ts_ms:timestamp");
+ populateNecessaryPropsForTimestampBasedKeyGen(properties);
+ return properties;
+ }
+
+ private TypedProperties getPropsWithoutRecordKeyFieldProps() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp:simple");
+ return properties;
+ }
+
+ private void populateNecessaryPropsForTimestampBasedKeyGen(TypedProperties
properties) {
+ properties.put("hoodie.deltastreamer.keygen.timebased.timestamp.type",
"DATE_STRING");
+ properties.put("hoodie.deltastreamer.keygen.timebased.input.dateformat",
"yyyy-MM-dd");
+ properties.put("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyyMMdd");
+ }
+
+ private TypedProperties getPropertiesForTimestampBasedKeyGen() {
+ TypedProperties properties = getCommonProps(false);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"ts_ms:timestamp");
+ populateNecessaryPropsForTimestampBasedKeyGen(properties);
+ return properties;
+ }
+
+ private TypedProperties getPropertiesForNonPartitionedKeyGen() {
+ TypedProperties properties = getCommonProps(false);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
+ return properties;
+ }
+
+ @Test
+ public void testSimpleKeyGenerator() {
+ KeyGenerator keyGenerator = new
CustomKeyGenerator(getPropertiesForSimpleKeyGen());
+ HoodieKey key = keyGenerator.getKey(getRecord());
+ Assertions.assertEquals(key.getRecordKey(), "key1");
+ Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+ }
+
+ @Test
+ public void testTimestampBasedKeyGenerator() {
+ KeyGenerator keyGenerator = new
CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen());
+ HoodieKey key = keyGenerator.getKey(getRecord());
+ Assertions.assertEquals(key.getRecordKey(), "key1");
+ Assertions.assertEquals(key.getPartitionPath(), "ts_ms=20200321");
+ }
+
+ @Test
+ public void testNonPartitionedKeyGenerator() {
+ KeyGenerator keyGenerator = new
CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen());
+ HoodieKey key = keyGenerator.getKey(getRecord());
+ Assertions.assertEquals(key.getRecordKey(), "key1");
+ Assertions.assertTrue(key.getPartitionPath().isEmpty());
+ }
+
+ @Test
+ public void testInvalidPartitionKeyType() {
+ try {
+ KeyGenerator keyGenerator = new
CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
+ keyGenerator.getKey(getRecord());
+ Assertions.fail("should fail when invalid PartitionKeyType is
provided!");
+ } catch (Exception e) {
+ Assertions.assertTrue(e.getMessage().contains("No enum constant
org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY"));
+ }
+ }
+
+ @Test
+ public void testNoRecordKeyFieldProp() {
+ try {
+ KeyGenerator keyGenerator = new
CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
+ keyGenerator.getKey(getRecord());
+ Assertions.fail("should fail when record key field is not provided!");
+ } catch (Exception e) {
+ Assertions.assertTrue(e.getMessage().contains("Property
hoodie.datasource.write.recordkey.field not found"));
+ }
+ }
+
+ @Test
+ public void testPartitionFieldsInImproperFormat() {
+ try {
+ KeyGenerator keyGenerator = new
CustomKeyGenerator(getImproperPartitionFieldFormatProp());
+ keyGenerator.getKey(getRecord());
+ Assertions.fail("should fail when partition key field is provided in
improper format!");
+ } catch (Exception e) {
+ Assertions.assertTrue(e.getMessage().contains("Unable to find field
names for partition path in proper format"));
+ }
+ }
+
+ @Test
+ public void testComplexRecordKeyWithSimplePartitionPath() {
+ KeyGenerator keyGenerator = new
CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps());
+ HoodieKey key = keyGenerator.getKey(getRecord());
+ Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+ Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+ }
+
+ @Test
+ public void testComplexRecordKeysWithComplexPartitionPath() {
+ KeyGenerator keyGenerator = new
CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps());
+ HoodieKey key = keyGenerator.getKey(getRecord());
+ Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+ Assertions.assertEquals(key.getPartitionPath(),
"timestamp=4357686/ts_ms=20200321");
+ }
+}
diff --git
a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
new file mode 100644
index 0000000..e46c783
--- /dev/null
+++
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGlobalDeleteKeyGenerator extends TestKeyGeneratorUtilities {
+
+ private TypedProperties getCommonProps(boolean getComplexRecordKey) {
+ TypedProperties properties = new TypedProperties();
+ if (getComplexRecordKey) {
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key,pii_col");
+ } else {
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key");
+ }
+ properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
"true");
+ return properties;
+ }
+
+ private TypedProperties getPropertiesWithoutRecordKeyProp() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp");
+ return properties;
+ }
+
+ private TypedProperties getWrongRecordKeyFieldProps() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_wrong_key");
+ return properties;
+ }
+
+ private TypedProperties getProps() {
+ TypedProperties properties = getCommonProps(true);
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp,ts_ms");
+ return properties;
+ }
+
+ @Test
+ public void testNullRecordKeyFields() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new
GlobalDeleteKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ }
+
+ @Test
+ public void testWrongRecordKeyField() {
+ GlobalDeleteKeyGenerator keyGenerator = new
GlobalDeleteKeyGenerator(getWrongRecordKeyFieldProps());
+ Assertions.assertThrows(HoodieKeyException.class, () ->
keyGenerator.getRecordKey(getRecord()));
+ }
+
+ @Test
+ public void testHappyFlow() {
+ GlobalDeleteKeyGenerator keyGenerator = new
GlobalDeleteKeyGenerator(getProps());
+ HoodieKey key = keyGenerator.getKey(getRecord());
+ Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+ Assertions.assertEquals(key.getPartitionPath(), "");
+ }
+}
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java
similarity index 50%
copy from
hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
copy to
hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java
index f5b32a0..c0d027e 100644
---
a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestKeyGeneratorUtilities.java
@@ -18,30 +18,23 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
-
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-/**
- * Simple Key generator for unpartitioned Hive Tables.
- */
-public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
-
- private static final String EMPTY_PARTITION = "";
+public class TestKeyGeneratorUtilities {
- public NonpartitionedKeyGenerator(TypedProperties props) {
- super(props);
- }
+ public String exampleSchema = "{\"type\": \"record\",\"name\":
\"testrec\",\"fields\": [ "
+ + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\",
\"type\": \"string\"},"
+ + "{\"name\": \"ts_ms\", \"type\": \"string\"},"
+ + "{\"name\": \"pii_col\", \"type\": \"string\"}]}";
- @Override
- public HoodieKey getKey(GenericRecord record) {
- String recordKey = DataSourceUtils.getNestedFieldValAsString(record,
recordKeyField, true);
- if (recordKey == null || recordKey.isEmpty()) {
- throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for
field: \"" + recordKeyField + "\" cannot be null or empty.");
- }
- return new HoodieKey(recordKey, EMPTY_PARTITION);
+ public GenericRecord getRecord() {
+ GenericRecord record = new GenericData.Record(new
Schema.Parser().parse(exampleSchema));
+ record.put("timestamp", 4357686);
+ record.put("_row_key", "key1");
+ record.put("ts_ms", "2020-03-21");
+ record.put("pii_col", "pi");
+ return record;
}
}
diff --git
a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
new file mode 100644
index 0000000..f36331a
--- /dev/null
+++
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestSimpleKeyGenerator extends TestKeyGeneratorUtilities {
+
+ private TypedProperties getCommonProps() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key");
+ properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
"true");
+ return properties;
+ }
+
+ private TypedProperties getPropertiesWithoutPartitionPathProp() {
+ return getCommonProps();
+ }
+
+ private TypedProperties getPropertiesWithoutRecordKeyProp() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp");
+ return properties;
+ }
+
+ private TypedProperties getWrongRecordKeyFieldProps() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp");
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_wrong_key");
+ return properties;
+ }
+
+ private TypedProperties getComplexRecordKeyProp() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp");
+ properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key,pii_col");
+ return properties;
+ }
+
+ private TypedProperties getProps() {
+ TypedProperties properties = getCommonProps();
+ properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"timestamp");
+ return properties;
+ }
+
+ @Test
+ public void testNullPartitionPathFields() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new
SimpleKeyGenerator(getPropertiesWithoutPartitionPathProp()));
+ }
+
+ @Test
+ public void testNullRecordKeyFields() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new
SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ }
+
+ @Test
+ public void testWrongRecordKeyField() {
+ SimpleKeyGenerator keyGenerator = new
SimpleKeyGenerator(getWrongRecordKeyFieldProps());
+ Assertions.assertThrows(HoodieKeyException.class, () ->
keyGenerator.getRecordKey(getRecord()));
+ }
+
+ @Test
+ public void testComplexRecordKeyField() {
+ SimpleKeyGenerator keyGenerator = new
SimpleKeyGenerator(getComplexRecordKeyProp());
+ Assertions.assertThrows(HoodieKeyException.class, () ->
keyGenerator.getRecordKey(getRecord()));
+ }
+
+ @Test
+ public void testHappyFlow() {
+ SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getProps());
+ HoodieKey key = keyGenerator.getKey(getRecord());
+ Assertions.assertEquals(key.getRecordKey(), "key1");
+ Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686");
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
similarity index 97%
rename from
hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
rename to
hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
index 81f7751..bd8583f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
+++
b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.keygen;
+package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
@@ -40,13 +40,13 @@ public class TestTimestampBasedKeyGenerator {
public void initialize() throws IOException {
Schema schema = SchemaTestUtil.getTimestampEvolvedSchema();
baseRecord = SchemaTestUtil
- .generateAvroRecordFromJson(schema, 1, "001", "f1");
+ .generateAvroRecordFromJson(schema, 1, "001", "f1");
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"field1");
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"createTime");
properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
"false");
}
-
+
private TypedProperties getBaseKeyConfig(String timestampType, String
dateFormat, String timezone, String scalarType) {
properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type",
timestampType);
properties.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
dateFormat);
@@ -55,7 +55,6 @@ public class TestTimestampBasedKeyGenerator {
if (scalarType != null) {
properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit",
scalarType);
}
-
return properties;
}
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 7cb78a1..e320d67 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -352,6 +352,14 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-spark_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<!-- Hive - Test -->
<dependency>
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 0d3e90c..41efc50 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -39,7 +39,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
-import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;