This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a7e6cf5 Support nested types for recordKey, partitionPath and
combineKey
a7e6cf5 is described below
commit a7e6cf51974f96c6b4bc945db25658cbb4bc48c7
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Tue May 14 19:59:04 2019 -0700
Support nested types for recordKey, partitionPath and combineKey
---
.../src/main/java/com/uber/hoodie/DataSourceUtils.java | 10 +++++++++-
.../hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java | 2 +-
.../hoodie/utilities/keygen/TimestampBasedKeyGenerator.java | 4 ++--
3 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index 19352ed..9eb8bcd 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -53,6 +53,14 @@ public class DataSourceUtils {
* Obtain value of the provided field as string, denoted by dot notation.
e.g: a.b.c
*/
public static String getNestedFieldValAsString(GenericRecord record, String
fieldName) {
+ Object obj = getNestedFieldVal(record, fieldName);
+ return (obj == null) ? null : obj.toString();
+ }
+
+ /**
+ * Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
+ */
+ public static Object getNestedFieldVal(GenericRecord record, String
fieldName) {
String[] parts = fieldName.split("\\.");
GenericRecord valueNode = record;
int i = 0;
@@ -65,7 +73,7 @@ public class DataSourceUtils {
// return, if last part of name
if (i == parts.length - 1) {
- return val.toString();
+ return val;
} else {
// VC: Need a test here
if (!(val instanceof GenericRecord)) {
diff --git
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index acff58f..e00c923 100644
---
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -243,7 +243,7 @@ public class HoodieDeltaStreamer implements Serializable {
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
HoodieRecordPayload payload =
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
- (Comparable) gr.get(cfg.sourceOrderingField));
+ (Comparable) DataSourceUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField));
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});
diff --git
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java
index f6a0c90..b4c26b9 100644
---
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java
+++
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java
@@ -83,7 +83,7 @@ public class TimestampBasedKeyGenerator extends
SimpleKeyGenerator {
@Override
public HoodieKey getKey(GenericRecord record) {
- Object partitionVal = record.get(partitionPathField);
+ Object partitionVal = DataSourceUtils.getNestedFieldVal(record,
partitionPathField);
SimpleDateFormat partitionPathFormat = new
SimpleDateFormat(outputDateFormat);
partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
@@ -102,7 +102,7 @@ public class TimestampBasedKeyGenerator extends
SimpleKeyGenerator {
"Unexpected type for partition field: " +
partitionVal.getClass().getName());
}
- return new HoodieKey(record.get(recordKeyField).toString(),
+ return new HoodieKey(DataSourceUtils.getNestedFieldValAsString(record,
recordKeyField),
partitionPathFormat.format(new Date(unixTime * 1000)));
} catch (ParseException pe) {
throw new HoodieDeltaStreamerException(